Hi,

Sorry for the delay, couldn't get to answer more early. I do understand your point perfectly. I just have a different perspective on what is going on. The most crucial piece of abstraction, the KTable is falling apart
and that materializes (no pun intended) itself into many problems.

1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and override with stateStoreName, and StatestoreSupplier in case people want to query that. This is what produces 2/3rd of the overloaded methods right now (not counting methods returning KStream)

2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name != null) store.put(k,v))

3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing required. Storing equivalent data of upstream KTables.

So I really see us tackeling only the first part currently. Wich in my opinion is to short-sighted to settle on an Public API. This is why I want to tackle our approach to IQ-first, as it seems to me to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed for some kind of enhanced flexibility) or aggregations would be the only KTables that would get associated with a statestore (their processors). For these operations one can have the "statestoresupplier" overload but also not the "querablestatestore" overload. From this point on KTables abstraction would be considered restored. All the overloads of join and through with respect to IQ would go away. "through" would go completely maybe the benefit added is. The method I would add is for a table to get a Queryhandle. This query handle will underneath remember its tables processor name. To access the data form IQ we would not rely on the "per processor statestore" but go the usual path through ValueGetterSupplier. *Note:* We do not necessarily have a Serde for V, especially after mapValues. also not for any intermediate Data types. It would be each KTableProccesors job to provide a serialized version of upstream Datatypes. KTableKTabkeJoinwould need to bring a JoinInputSerializer<V1,V2> that would serialize both upstream values for transport across boxes.

This first step would kill all the "Storename" based overloads + many Statestore overloads. It would also avoid the bloated copy pasting in each KTableProcessor for maintaining the store. It would also make the runtime more efficient in a way that it does not store the same data twice, just for accessing from IQ. Tackeling problem 1 but also all other three problems mentioned above.

From here ~3 or 4 (from kstream,topic or aggregate) methods would still be stuck with StateStoresupplier overload. For me, this is quite an improvement already, to reduce further overloads I am thinking to put a nullable properties to this operations. If people want to use all defaults they could throw in null and it wouldn't be to painfull. That doesn't necessarily require them to have config files laying around. They could if they wanted use property files to create such properties + we would over to look for configs in the streams property. So the complexity of distributing property files is optional and the user might choose to fill the configs by code or files.

I think these steps can rescue the proper abstraction of a KTable. I believe that with the current proposals we are only sugarcoating problem 1 and end up with a broken idea of what KTable is. I think it will be even harder to develop further from there. Interface wise my proposal is like developing backwards as i am very certain we did a wrong turn with the IQ we shouldn't try to carry through.

I hope I could explain how this re factoring can tackle the 3 above problems and especially why i don't think we can win tackiling only point 1 in the long run. If anything would need an implementation draft please feel free to ask me to provide one. Initially the proposal hopefully would get the job done of just removing clutter.

Looking forward to your comments.

Best Jan



On 12.07.2017 21:27, Guozhang Wang wrote:
Hello Jan,

Thanks for your feedbacks. Let me try to clarify a few things with the problems that we are trying to resolve and the motivations with the current proposals.

As Matthias mentioned, one issue that we are trying to tackle is to reduce the number of overloaded functions in the DSL due to serde overridden / state store supplier overridden that are needed for repartition, or for state store materializations. Another related issue is that the current overridden state store supplier is not very natural to use, for example:

1) If a user just want to disable caching / logging etc but do not want to change the underlying store engine at all, she needs to learn to know that, for example, if a windowed store or key-value store is needed for this specific operator in the DSL, what serdes are needed for materialize the store, in order to create a StateStoreSupplier with caching / logging disabled, and then pass into the DSL.

2) Similarly, if a user just want to set different topic configs for the changelog topic, she still need to specify the whole StateStoreSupplier into the operator.

3) If a user want to use a different store engine (e.g. MyStore than RocksDBStore) underneath but do not care about the default settings for logging, caching, etc, he STILL needs to pass in the whole StateStoreSupplier into the operator.

Note that all the above scenarios are for advanced users who do want to override these settings, for users who are just OK with the default settings they should be not exposed with such APIs at all, like you said, "I do not be exposed with any of such implementation details", if you do not care.

-----------------

We have been talking about the configs v.s. code for such settings, since we have been using configs for "global" default configs; but the arguments against using configs for such per-operator / per-store settings as well is that it will simply make configs hard to manage / hard to wire with tools. Personally speaking, I'm not a big fan of using configs for per-entity overrides and that is mainly from my experience with Samza:Samza inherits exactly the same approach for per-stream / per-source configs:

http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html <http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html> ([system-name][stream-id] etc are all place-holders)

The main issues were 1) users making config changes need to deploy this to all the instances, I think for Streams it would be even worse as we need to make a config file on each of the running instance, and whenever there is a change we need to make sure they are propagated to all of them, 2) whenever users make some code changes, e.g. to add a new stream / system, they need to remember to set the corresponding changes in the config files as well and they kept forgetting about it, the lesson learned there was that it is always better to change one place (code change) than two (code change + config file change).

Again, this is not saying we have vetoed this option, and if people have good reasons for this let's discuss them here.

-----------------

So the current proposals are mainly around keeping configs for the global default settings, while still allowing users to override per-operator / per-store settings in the code, while also keeping in mind to not forced users to think about such implementation details if they are fine with whatever the default settings. For example:

As a normal user it is sufficient to specify an aggregation as

```
table4.join(table5, joiner).table();
```

in which she can still just focus on the computational logic with all implementation details abstracted away; only if the user are capable enough with the implementation details (e.g. how is the joining tables be materialized into state stores, etc) and want to specify her own settings (e.g. I want to swap in my own state store engine, or I want to disable caching for dedup, or use a different serde etc) she can "explore" them with the DSL again:

```
table4.join(table5, joiner).table(Materialized.as("store1")); // use a custom store name for interactive query table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier)); // use a custom store engine table4.join(table5, joiner).table(Materialized.as("store1").withLoggingEnabled(configs)); // use a custom store changelog topic configs
// ... more
```

Hope it helps.


Guozhang


On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> wrote:

    It makes me want to cry.

    why on earth is the DSL going to expose all its implementation
    details now?
    especially being materialized or not.

    If we want to take usefull steps in that direction maybe we are
    looking for a way to let the user switch back and forth between
    PAPI and DSL?

    A change as the proposed would not eliminate any of my pain points
    while still being a heck of work migrating towards to.

    Since I am only following this from the point where Eno CC'ed it
    into the users list:

    Can someone please rephrase for me what problem this is trying to
    solve? I don't mean to be rude but It uses a problematic feature
    "StateStoreSuppliers in DSL" to justify making it even worse. This
    helps us nowhere in making the configs more flexible, its just
    syntactic sugar.

    A low effort shoot like: lets add a properties to operations that
    would otherwise become overloaded to heavy? Or pull the configs by
    some naming schema
    form the overall properties. Additionally to that we get rid of
    StateStoreSuppliers in the DSL and have them also configured by
    said properties.

    => way easier to migrate to, way less risk, way more flexible in
    the future (different implementations of the same operation don't
    require code change to configure)

    Line 184 makes especially no sense to me. what is a KTableKTable
    non materialized join anyways?

    Hope we can discuss more on this.




    On 07.07.2017 17:23, Guozhang Wang wrote:

        I messed the indentation on github code repos; this would be
        easier to read:

        https://codeshare.io/GLWW8K


        Guozhang


        On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang
        <wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote:

            Hi Damian / Kyle,

            I think I agree with you guys about the pros / cons of
            using the builder
            pattern v.s. using some "secondary classes". And I'm
            thinking if we can
            take a "mid" manner between these two. I spent some time
            with a slight
            different approach from Damian's current proposal:

            
https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
            
<https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/>
            java/org/apache/kafka/streams/RefactoredAPIs.java

            The key idea is to tolerate the final "table()" or
            "stream()" function to
            "upgrade" from the secondary classes to the first citizen
            classes, while
            having all the specs inside this function. Also this
            proposal includes some
            other refactoring that people have been discussed about
            for the builder to
            reduce the overloaded functions as well. WDYT?


            Guozhang


            On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy
            <damian....@gmail.com <mailto:damian....@gmail.com>> wrote:

                Hi Jan,

                Thanks very much for the input.

                On Tue, 4 Jul 2017 at 08:54 Jan Filipiak
                <jan.filip...@trivago.com
                <mailto:jan.filip...@trivago.com>>
                wrote:

                    Hi Damian,

                    I do see your point of something needs to change.
                    But I fully agree with
                    Gouzhang when he says.
                    ---

                    But since this is a incompatibility change, and we
                    are going to remove

                the

                    compatibility annotations soon it means we only
                    have one chance and we
                    really have to make it right.
                    ----


                I think we all agree on this one! Hence the discussion.


                    I fear all suggestions do not go far enough to
                    become something that

                will

                    carry on for very much longer.
                    I am currently working on KAFKA-3705 and try to
                    find the most easy way

                for

                    the user to give me all the required
                    functionality. The easiest

                interface I

                    could come up so far can be looked at here.


                    https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
                    
<https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>

                de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
                kafka/streams/kstream/internals/KTableImpl.java#L622


                And its already horribly complicated. I am currently
                unable to find the

                    right abstraction level to have everything falling
                    into place

                naturally. To

                    be honest I already think introducing


                To be fair that is not a particularly easy problem to
                solve!


                    https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
                    
<https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>

                de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
                kafka/streams/kstream/internals/KTableImpl.java#L493

                    was unideal and makes everything a mess.


                I'm not sure i agree that it makes everything a mess,
                but It could have
                been done differently.

                The JoinType:Whatever is also not really flexible. 2
                things come to my
                mind:

                    1. I don't think we should rule out config based
                    decisions say configs

                like

streams.$applicationID.joins.$joinname.conf = value

                Is this just for config? Or are you suggesting that we
                could somehow
                "code"
                the join in a config file?


                    This can allow for tremendous changes without
                    single API change and IMO

                it

                    was not considered enough yet.

                    2. Push logic from the DSL to the Callback
                    classes. A ValueJoiner for
                    example can be used to implement different join
                    types as the user

                wishes.
                Do you have an example of how this might look?


                    As Gouzhang said: stopping to break users is very
                    important.


                Of course. We want to make it as easy as possible for
                people to use
                streams.


                especially with this changes + All the plans I sadly
                only have in my head

                    but hopefully the first link can give a glimpse.

                    Thanks for preparing the examples made it way
                    clearer to me what exactly
                    we are talking about. I would argue to go a bit
                    slower and more

                carefull on

                    this one. At some point we need to get it right.
                    Peeking over to the

                hadoop

                    guys with their hughe userbase. Config files
                    really work well for them.

                    Best Jan





                    On 30.06.2017 09:31, Damian Guy wrote:

                        Thanks Matthias

                        On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
                        <matth...@confluent.io
                        <mailto:matth...@confluent.io>>

                    wrote:

                            I am just catching up on this thread, so
                            sorry for the long email in
                            advance... Also, it's to some extend a
                            dump of thoughts and not

                always a

                            clear proposal. Still need to think about
                            this in more detail. But

                maybe

                            it helps other to get new ideas :)


                                    However, I don't understand your
                                    argument about putting aggregate()
                                    after the withXX() -- all the
                                    calls to withXX() set optional

                    parameters

                                    for aggregate() and not for
                                    groupBy() -- but a groupBy().withXX()
                                    indicates that the withXX()
                                    belongs to the groupBy(). IMHO, this

                might

                                    be quite confusion for developers.


                                I see what you are saying, but the
                                grouped stream is effectively a

                    no-op

                                until you call one of the
                                aggregate/count/reduce etc functions. So

                the

                                optional params are ones that are
                                applicable to any of the

                operations

                    you

                                can perform on this grouped stream.
                                Then the final
                                count()/reduce()/aggregate() call has
                                any of the params that are
                                required/specific to that function.

                            I understand your argument, but you don't
                            share the conclusion. If we
                            need a "final/terminal" call, the better
                            way might be

                            .groupBy().count().withXX().build()

                            (with a better name for build() though)


                        The point is that all the other calls,
                        i.e,withBlah, windowed, etc

                apply

                        too all the aggregate functions. The terminal
                        call being the actual

                type

                    of

                        aggregation you want to do. I personally find
                        this more natural than
                        groupBy().count().withBlah().build()


                                groupedStream.count(/** non windowed
                                count**/)
                                
groupedStream.windowed(TimeWindows.of(10L)).count(...)
                                
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)

                            I like this. However, I don't see a reason
                            to have windowed() and
                            sessionWindowed(). We should have one
                            top-level `Windows` interface

                that

                            both `TimeWindows` and `SessionWindows`
                            implement and just have a

                single

                            windowed() method that accepts all
                            `Windows`. (I did not like the
                            separation of `SessionWindows` in the
                            first place, and this seems to

                be

                            an opportunity to clean this up. It was
                            hard to change when we
                            introduced session windows)

                        Yes - true we should look into that.


                            Btw: we do you the imperative groupBy()
                            and groupByKey(), and thus we
                            might also want to use windowBy() (instead
                            of windowed()). Not sure

                how

                            important this is, but it seems to be
                            inconsistent otherwise.


                        Makes sense


                            About joins:  I don't like
                            .withJoinType(JoinType.LEFT) at all. I

                think,

                            defining an inner/left/outer join is not
                            an optional argument but a
                            first class concept and should have a
                            proper representation in the

                API

                            (like the current methods join(),
                            leftJoin, outerJoin()).


                        Yep, i did originally have it as a required
                        param and maybe that is

                what

                    we

                        go with. It could have a default, but maybe
                        that is confusing.



                            About the two join API proposals, the
                            second one has too much boiler
                            plate code for my taste. Also, the actual
                            join() operator has only

                one

                            argument what is weird to me, as in my
                            thinking process, the main
                            operator call, should have one parameter
                            per mandatory argument but

                your

                            proposal put the mandatory arguments into
                            Joins.streamStreamJoin()

                call.

                            This is far from intuitive IMHO.


                        This is the builder pattern, you only need one
                        param as the builder

                has

                        captured all of the required and optional
                        arguments.


                            The first join proposal also seems to
                            align better with the pattern
                            suggested for aggregations and having the
                            same pattern for all

                operators

                            is important (as you stated already).


                        This is why i offered two alternatives as i
                        started out with. 1 is the
                        builder pattern, the other is the more fluent
                        pattern.


                            Coming back to the config vs optional
                            parameter. What about having a
                            method withConfig[s](...) that allow to
                            put in the configuration?


                        Sure, it is currently called withLogConfig()
                        as that is the only thing

                    that

                        is really config.


                            This also raises the question if until()
                            is a windows property?
                            Actually, until() seems to be a
                            configuration parameter and thus,

                should

                            not not have it's own method.


                        Hmmm, i don't agree. Until is a property of
                        the window. It is going

                to be

                        potentially different for every window
                        operation you do in a streams

                app.


                            Browsing throw your example DSL branch, I
                            also saw this one:

                                final KTable<Windowed<String>, Long>
                                windowed>

                               groupedStream.counting()

                                 .windowed(TimeWindows.of(10L).until(10))
                                                   .table();

                            This is an interesting idea, and it remind
                            my on some feedback about

                "I

                            wanted to count a stream, but there was no
                            count() method -- I first
                            needed to figure out, that I need to group
                            the stream first to be

                able

                            to count it. It does make sense in
                            hindsight but was not obvious in

                the

                            beginning". Thus, carrying out this
                            thought, we could also do the
                            following:

                            stream.count().groupedBy().windowedBy().table();

                            -> Note, I use "grouped" and "windowed"
                            instead of imperative here,

                as

                            it comes after the count()

                            This would be more consistent than your
                            proposal (that has grouping
                            before but windowing after count()). It
                            might even allow us to enrich
                            the API with a some syntactic sugar like
                            `stream.count().table()` to

                get

                            the overall count of all records (this
                            would obviously not scale,

                but we

                            could support it -- if not now, maybe later).


                        I guess i'd prefer
                        stream.groupBy().windowBy().count()
                        stream.groupBy().windowBy().reduce()
                        stream.groupBy().count()

                        As i said above, everything that happens
                        before the final aggregate

                call

                        can be applied to any of them. So it makes
                        sense to me to do those

                things

                        ahead of the final aggregate call.


                            Last about builder pattern. I am convinced
                            that we need some

                "terminal"

                            operator/method that tells us when to add
                            the processor to the

                topology.

                            But I don't see the need for a plain
                            builder pattern that feels

                alien to

                            me (see my argument about the second join
                            proposal). Using .stream()

                /

                            .table() as use in many examples might
                            work. But maybe a more generic
                            name that we can use in all places like
                            build() or apply() might

                also be

                            an option.


                        Sure, a generic name might be ok.




                            -Matthias



                            On 6/29/17 7:37 AM, Damian Guy wrote:

                                Thanks Kyle.

                                On Thu, 29 Jun 2017 at 15:11 Kyle
                                Winkelman <

                winkelman.k...@gmail.com
                <mailto:winkelman.k...@gmail.com>>

                                wrote:

                                    Hi Damian,

                                                    When trying to
                                                    program in the
                                                    fluent API that
                                                    has been

                discussed

                            most

                                    it

                                                    feels difficult to
                                                    know when you will
                                                    actually get an object

                you

                    can

                                    reuse.

                                                    What if I make one
                                                    KGroupedStream
                                                    that I want to
                                                    reuse, is it

                    legal

                            to

                                                    reuse it or does
                                                    this approach
                                                    expect you to call
                                                    grouped each

                    time?

                                            I'd anticipate that once
                                            you have a KGroupedStream
                                            you can

                re-use it

                            as

                                    you

                                            can today.

                                    You said it yourself in another
                                    post that the grouped stream is
                                    effectively a no-op until a count,
                                    reduce, or aggregate. The way I

                see

                            it

                                    you wouldn’t be able to reuse
                                    anything except KStreams and KTables,

                            because

                                    most of this fluent api would
                                    continue returning this (this being

                the

                                    builder object currently being
                                    manipulated).

                                So, if you ever store a reference to
                                anything but KStreams and

                KTables

                            and

                                    you use it in two different ways
                                    then its possible you make

                    conflicting

                                    withXXX() calls on the same builder.


                                No necessarily true. It could return a
                                new instance of the builder,

                    i.e.,

                                the builders being immutable. So if
                                you held a reference to the

                builder

                            it

                                would always be the same as it was
                                when it was created.


                                    GroupedStream<K,V>
                                    groupedStreamWithDefaultSerdes =

                kStream.grouped();

                                    GroupedStream<K,V>
                                    groupedStreamWithDeclaredSerdes =
                                    
groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);

                                    I’ll admit that this shouldn’t
                                    happen but some user is going to do

                it

                                    eventually…
                                    Depending on implementation uses
                                    of groupedStreamWithDefaultSerdes

                    would

                                    most likely be equivalent to the
                                    version withDeclaredSerdes. One

                work

                                    around would be to always make
                                    copies of the config objects you are
                                    building, but this approach has
                                    its own problem because now we

                have to

                                    identify which configs are
                                    equivalent so we don’t create repeated
                                    processors.

                                    The point of this long winded
                                    example is that we always have to be
                                    thinking about all of the possible
                                    ways it could be misused by a

                user

                                    (causing them to see hard to
                                    diagnose problems).

                                Exactly! That is the point of the
                                discussion really.


                                    In my attempt at a couple methods
                                    with builders I feel that I could
                                    confidently say the user couldn’t
                                    really mess it up.

                                        // Count
                                        KTable<String, Long> count =

                    
kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));

                                    The kGroupedStream is reusable and
                                    if they attempted to reuse the

                    Count

                                    for some reason it would throw an
                                    error message saying that a store

                            named

                                    “my-store” already exists.


                                Yes i agree and i think using builders
                                is my preferred pattern.

                                Cheers,
                                Damian


                                    Thanks,
                                    Kyle

                                    From: Damian Guy
                                    Sent: Thursday, June 29, 2017 3:59 AM
                                    To: d...@kafka.apache.org
                                    <mailto:d...@kafka.apache.org>
                                    Subject: Re: [DISCUSS] Streams
                                    DSL/StateStore Refactoring

                                    Hi Kyle,

                                    Thanks for your input. Really
                                    appreciated.

                                    On Thu, 29 Jun 2017 at 06:09 Kyle
                                    Winkelman <

                winkelman.k...@gmail.com <mailto:winkelman.k...@gmail.com>

                                    wrote:

                                        I like more of a builder
                                        pattern even though others
                                        have voiced

                    against

                                        it. The reason I like it is
                                        because it makes it clear to
                                        the user

                    that

                            a

                                        call to KGroupedStream#count
                                        will return a KTable not some

                    intermediate

                                        class that I need to undetstand.

                                    Yes, that makes sense.


                                        When trying to program in the
                                        fluent API that has been discussed

                most

                            it

                                        feels difficult to know when
                                        you will actually get an
                                        object you

                can

                                    reuse.

                                        What if I make one
                                        KGroupedStream that I want to
                                        reuse, is it

                legal

                    to

                                        reuse it or does this approach
                                        expect you to call grouped each

                time?

                                    I'd anticipate that once you have
                                    a KGroupedStream you can re-use

                it

                    as

                            you

                                    can today.


                                        This question doesn’t pop into
                                        my head at all in the builder

                pattern

                    I

                                        assume I can reuse everything.
                                        Finally, I like .groupByKey
                                        and .groupBy(KeyValueMapper)
                                        not a big

                    fan

                            of

                                        the grouped.

                                        Yes, grouped() was more for
                                        demonstration and because
                                        groupBy()

                and

                                    groupByKey() were taken! So i'd
                                    imagine the api would actually

                want to

                            be

                                    groupByKey(/** no required
                                    args***/).withOptionalArg() and
                                    groupBy(KeyValueMapper
                                    m).withOpitionalArg(...)  of
                                    course this all

                            depends

                                    on maintaining backward compatibility.


                                        Unfortunately, the below
                                        approach would require atleast 2

                (probably

                    3)

                                        overloads (one for returning a
                                        KTable and one for returning a

                KTable

                            with

                                        Windowed Key, probably would
                                        want to split windowed and

                    sessionwindowed

                                    for

                                        ease of implementation) of
                                        each count, reduce, and aggregate.
                                        Obviously not exhaustive but
                                        enough for you to get the picture.

                    Count,

                                        Reduce, and Aggregate supply 3
                                        static methods to initialize the

                            builder:

                                        // Count
                                        KTable<String, Long> count =

                    
groupedStream.count(Count.count().withQueryableStoreName("my-store"));

                                        // Windowed Count
                                        KTable<Windowed<String>, Long>
                                        windowedCount =

                    groupedStream.count(Count.windowed(TimeWindows.of(10L).until

                (10)).withQueryableStoreName("my-windowed-store"));

                                        // Session Count
                                        KTable<Windowed<String>, Long>
                                        sessionCount =

                    groupedStream.count(Count.sessionWindowed(SessionWindows.

                with(10L)).withQueryableStoreName("my-session-windowed-store"));

                                    Above and below, i think i'd
                                    prefer it to be:
                                    groupedStream.count(/** non
                                    windowed count**/)
                                    
groupedStream.windowed(TimeWindows.of(10L)).count(...)
                                    
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)




                                        // Reduce
                                        Reducer<Long> reducer;
                                        KTable<String, Long> reduce =
                                        groupedStream.reduce(reducer,
                                        
Reduce.reduce().withQueryableStoreName("my-store"));

                                        // Aggregate Windowed with
                                        Custom Store
                                        Initializer<String> initializer;
                                        Aggregator<String, Long,
                                        String> aggregator;
                                        KTable<Windowed<String>,
                                        String> aggregate =
                                        groupedStream.aggregate(initializer,
                                        aggregator,

                    Aggregate.windowed(TimeWindows.of(10L).until(10)).

                withStateStoreSupplier(stateStoreSupplier)));

                                        // Cogroup SessionWindowed
                                        KTable<String, String> cogrouped =

                    groupedStream1.cogroup(aggregator1)

.cogroup(groupedStream2,
                                        aggregator2)
.aggregate(initializer,
                                        aggregator,
                                        
Aggregate.sessionWindowed(SessionWindows.with(10L),
                                        
sessionMerger).withQueryableStoreName("my-store"));



                                        public class Count {

                                              public static class
                                        Windowed extends Count {
                                                  private Windows windows;
                                              }
                                              public static class
                                        SessionWindowed extends Count {
                                                  private
                                        SessionWindows sessionWindows;
                                              }

                                              public static Count count();
                                              public static Windowed
                                        windowed(Windows windows);
                                              public static
                                        SessionWindowed
                                        sessionWindowed(SessionWindows
                                        sessionWindows);

                                              // All withXXX(...) methods.
                                        }

                                        public class KGroupedStream {
                                              public KTable<K, Long>
                                        count(Count count);
                                              public
                                        KTable<Windowed<K>, Long>
                                        count(Count.Windowed count);
                                              public
                                        KTable<Windowed<K>, Long>
                                        count(Count.SessionWindowed

                            count);

                                        …
                                        }


                                        Thanks,
                                        Kyle

                                        From: Guozhang Wang
                                        Sent: Wednesday, June 28, 2017
                                        7:45 PM
                                        To: d...@kafka.apache.org
                                        <mailto:d...@kafka.apache.org>
                                        Subject: Re: [DISCUSS] Streams
                                        DSL/StateStore Refactoring

                                        I played the current proposal
                                        a bit with

                            https://github.com/dguy/kafka/

                                        tree/dsl-experiment <

                    https://github.com/dguy/kafka/tree/dsl-experiment
                    <https://github.com/dguy/kafka/tree/dsl-experiment>

                                ,

                                        and here are my observations:

                                        1. Personally I prefer

                                              "stream.group(mapper) /
                                        stream.groupByKey()"

                                        than

"stream.group().withKeyMapper(mapper)
                                        / stream.group()"

                                        Since 1) withKeyMapper is not
                                        enforced programmatically
                                        though it

                is

                            not

                                        "really" optional like others,
                                        2) syntax-wise it reads more

                natural.

                                        I think it is okay to add the
                                        APIs in (


                    https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr
                    
<https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr>

                c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java

                                        )
                                        in KGroupedStream.


                                        2. For the
                                        "withStateStoreSupplier" API,
                                        are the user supposed to

                    pass

                            in

                                        the most-inner state store
                                        supplier (e.g. then one whose
                                        get()

                return

                                        RocksDBStore), or it is
                                        supposed to return the
                                        most-outer supplier

                    with

                                        logging / metrics / etc? I
                                        think it would be more useful
                                        to only

                            require

                                        users pass in the inner state
                                        store supplier while specifying

                    caching /

                                        logging through other APIs.

                                        In addition, the
                                        "GroupedWithCustomStore" is a
                                        bit suspicious to

                me:

                    we

                                    are

                                        allowing users to call other
                                        APIs like "withQueryableName"

                multiple

                            time,

                                        but only call
                                        "withStateStoreSupplier" only
                                        once in the end. Why

                is

                            that?

                                        3. The current DSL seems to be
                                        only for aggregations, what about

                    joins?


                                        4. I think it is okay to keep
                                        the "withLogConfig": for the
                                        StateStoreSupplier it will
                                        still be user code specifying the

                topology

                            so

                                    I

                                        do not see there is a big
                                        difference.


                                        5. "WindowedGroupedStream" 's
                                        withStateStoreSupplier should take

                the

                                        windowed state store supplier
                                        to enforce typing?


                                        Below are minor ones:

                                        6. "withQueryableName": maybe
                                        better "withQueryableStateName"?

                                        7. "withLogConfig": maybe
                                        better "withLoggingTopicConfig()"?



                                        Guozhang



                                        On Wed, Jun 28, 2017 at 3:59
                                        PM, Matthias J. Sax <

                            matth...@confluent.io
                            <mailto:matth...@confluent.io>>

                                        wrote:

                                            I see your point about
                                            "when to add the processor
                                            to the

                topology".

                                    That

                                            is indeed an issue. Not
                                            sure it we could allow
                                            "updates" to the

                                        topology...

                                            I don't see any problem
                                            with having all the
                                            withXX() in KTable

                                    interface

                                            -- but this might be
                                            subjective.


                                            However, I don't
                                            understand your argument
                                            about putting

                aggregate()

                                            after the withXX() -- all
                                            the calls to withXX() set
                                            optional

                            parameters

                                            for aggregate() and not
                                            for groupBy() -- but a
                                            groupBy().withXX()
                                            indicates that the
                                            withXX() belongs to the
                                            groupBy(). IMHO, this

                    might

                                            be quite confusion for
                                            developers.


                                            -Matthias

                                            On 6/28/17 2:55 AM, Damian
                                            Guy wrote:

                                                    I also think that
                                                    mixing optional
                                                    parameters with
                                                    configs is a

                bad

                                        idea.

                                                    Have not proposal
                                                    for this atm but
                                                    just wanted to
                                                    mention it.

                Hope

                                    to

                                                    find some time to
                                                    come up with
                                                    something.


                                                Yes, i don't like the
                                                mix of config either.
                                                But the only real

                    config

                                        here

                                                is the logging config
                                                - which we don't
                                                really need as it can

                    already

                                    be

                                                done via a custom
                                                StateStoreSupplier.


                                                    What I don't like
                                                    in the current
                                                    proposal is the
                                                    .grouped().withKeyMapper()
                                                    -- the current
                                                    solution with

                                    .groupBy(...)

                                                    and .groupByKey()
                                                    seems better. For
                                                    clarity, we could
                                                    rename to
                                                    .groupByNewKey(...) and
                                                    .groupByCurrentKey()
                                                    (even if we should

                    find

                                                    some better names).


                                                it could be
                                                groupByKey(),
                                                groupBy() or something
                                                different bt



                                                    The proposed
                                                    pattern "chains"
                                                    grouping and
                                                    aggregation too

                close

                                                    together. I would
                                                    rather separate
                                                    both more than
                                                    less, ie, do

                into

                                    the

                                                    opposite direction.

                                                    I am also
                                                    wondering, if we
                                                    could so something
                                                    more "fluent".

                The

                                        initial

                                                    proposal was like:

                                                            
groupedStream.count()
.withStoreName("name")
                                                             
.withCachingEnabled(false)
                                                             
.withLoggingEnabled(config)
                                                                 .table()

                                                    The .table()
                                                    statement in the
                                                    end was kinda alien.

                                                I agree, but then all
                                                of the withXXX methods
                                                need to be on

                KTable

                                    which

                                            is

                                                worse in my opinion.
                                                You also need
                                                something that is going to

                    "build"

                                        the

                                                internal processors
                                                and add them to the
                                                topology.


                                                    The current
                                                    proposal put the
                                                    count() into the
                                                    end -- ie, the

                                    optional

                                                    parameter for
                                                    count() have to
                                                    specified on the
                                                    .grouped() call

                --

                                    this

                                                    does not seems to
                                                    be the best way
                                                    either.


                                                I actually prefer this
                                                method as you are
                                                building a grouped

                stream

                                    that

    ...

[Message clipped]



--
-- Guozhang

Reply via email to