Hi Damian,

>>>> When trying to program in the fluent API that has been discussed most it
>>>> feels difficult to know when you will actually get an object you can reuse.
>>>> What if I make one KGroupedStream that I want to reuse, is it legal to
>>>> reuse it or does this approach expect you to call grouped each time?

>> I'd anticipate that once you have a KGroupedStream you can re-use it as you
>> can today.

You said it yourself in another post that the grouped stream is effectively a 
no-op until a count, reduce, or aggregate. The way I see it you wouldn’t be 
able to reuse anything except KStreams and KTables, because most of this fluent 
api would continue returning this (this being the builder object currently 
being manipulated). So, if you ever store a reference to anything but KStreams 
and KTables and you use it in two different ways then its possible you make 
conflicting withXXX() calls on the same builder.

GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
GroupedStream<K,V> groupedStreamWithDeclaredSerdes = 
groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);

I’ll admit that this shouldn’t happen but some user is going to do it 
eventually…
Depending on implementation uses of groupedStreamWithDefaultSerdes would most 
likely be equivalent to the version withDeclaredSerdes. One work around would 
be to always make copies of the config objects you are building, but this 
approach has its own problem because now we have to identify which configs are 
equivalent so we don’t create repeated processors.

The point of this long winded example is that we always have to be thinking 
about all of the possible ways it could be misused by a user (causing them to 
see hard to diagnose problems).

In my attempt at a couple methods with builders I feel that I could confidently 
say the user couldn’t really mess it up.
> // Count
> KTable<String, Long> count =
> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
The kGroupedStream is reusable and if they attempted to reuse the Count for 
some reason it would throw an error message saying that a store named 
“my-store” already exists.

Thanks,
Kyle

From: Damian Guy
Sent: Thursday, June 29, 2017 3:59 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring

Hi Kyle,

Thanks for your input. Really appreciated.

On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> I like more of a builder pattern even though others have voiced against
> it. The reason I like it is because it makes it clear to the user that a
> call to KGroupedStream#count will return a KTable not some intermediate
> class that I need to undetstand.
>

Yes, that makes sense.


> When trying to program in the fluent API that has been discussed most it
> feels difficult to know when you will actually get an object you can reuse.
> What if I make one KGroupedStream that I want to reuse, is it legal to
> reuse it or does this approach expect you to call grouped each time?


I'd anticipate that once you have a KGroupedStream you can re-use it as you
can today.


> This question doesn’t pop into my head at all in the builder pattern I
> assume I can reuse everything.
> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of
> the grouped.
>
> Yes, grouped() was more for demonstration and because groupBy() and
groupByKey() were taken! So i'd imagine the api would actually want to be
groupByKey(/** no required args***/).withOptionalArg() and
groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all depends
on maintaining backward compatibility.


> Unfortunately, the below approach would require atleast 2 (probably 3)
> overloads (one for returning a KTable and one for returning a KTable with
> Windowed Key, probably would want to split windowed and sessionwindowed for
> ease of implementation) of each count, reduce, and aggregate.
> Obviously not exhaustive but enough for you to get the picture. Count,
> Reduce, and Aggregate supply 3 static methods to initialize the builder:
> // Count
> KTable<String, Long> count =
> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>
> // Windowed Count
> KTable<Windowed<String>, Long> windowedCount =
> groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
>
> // Session Count
> KTable<Windowed<String>, Long> sessionCount =
> groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
>
>
Above and below, i think i'd prefer it to be:
groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)




> // Reduce
> Reducer<Long> reducer;
> KTable<String, Long> reduce = groupedStream.reduce(reducer,
> Reduce.reduce().withQueryableStoreName("my-store"));
>
> // Aggregate Windowed with Custom Store
> Initializer<String> initializer;
> Aggregator<String, Long, String> aggregator;
> KTable<Windowed<String>, String> aggregate =
> groupedStream.aggregate(initializer, aggregator,
> Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
>
> // Cogroup SessionWindowed
> KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1)
>         .cogroup(groupedStream2, aggregator2)
>         .aggregate(initializer, aggregator,
> Aggregate.sessionWindowed(SessionWindows.with(10L),
> sessionMerger).withQueryableStoreName("my-store"));
>
>
>
> public class Count {
>
>     public static class Windowed extends Count {
>         private Windows windows;
>     }
>     public static class SessionWindowed extends Count {
>         private SessionWindows sessionWindows;
>     }
>
>     public static Count count();
>     public static Windowed windowed(Windows windows);
>     public static SessionWindowed sessionWindowed(SessionWindows
> sessionWindows);
>
>     // All withXXX(...) methods.
> }
>
> public class KGroupedStream {
>     public KTable<K, Long> count(Count count);
>     public KTable<Windowed<K>, Long> count(Count.Windowed count);
>     public KTable<Windowed<K>, Long> count(Count.SessionWindowed count);
> …
> }
>
>
> Thanks,
> Kyle
>
> From: Guozhang Wang
> Sent: Wednesday, June 28, 2017 7:45 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>
> I played the current proposal a bit with https://github.com/dguy/kafka/
> tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>,
> and here are my observations:
>
> 1. Personally I prefer
>
>     "stream.group(mapper) / stream.groupByKey()"
>
> than
>
>     "stream.group().withKeyMapper(mapper) / stream.group()"
>
> Since 1) withKeyMapper is not enforced programmatically though it is not
> "really" optional like others, 2) syntax-wise it reads more natural.
>
> I think it is okay to add the APIs in (
>
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> )
> in KGroupedStream.
>
>
> 2. For the "withStateStoreSupplier" API, are the user supposed to pass in
> the most-inner state store supplier (e.g. then one whose get() return
> RocksDBStore), or it is supposed to return the most-outer supplier with
> logging / metrics / etc? I think it would be more useful to only require
> users pass in the inner state store supplier while specifying caching /
> logging through other APIs.
>
> In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we are
> allowing users to call other APIs like "withQueryableName" multiple time,
> but only call "withStateStoreSupplier" only once in the end. Why is that?
>
>
> 3. The current DSL seems to be only for aggregations, what about joins?
>
>
> 4. I think it is okay to keep the "withLogConfig": for the
> StateStoreSupplier it will still be user code specifying the topology so I
> do not see there is a big difference.
>
>
> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> windowed state store supplier to enforce typing?
>
>
> Below are minor ones:
>
> 6. "withQueryableName": maybe better "withQueryableStateName"?
>
> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>
>
>
> Guozhang
>
>
>
> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I see your point about "when to add the processor to the topology". That
> > is indeed an issue. Not sure it we could allow "updates" to the
> topology...
> >
> > I don't see any problem with having all the withXX() in KTable interface
> > -- but this might be subjective.
> >
> >
> > However, I don't understand your argument about putting aggregate()
> > after the withXX() -- all the calls to withXX() set optional parameters
> > for aggregate() and not for groupBy() -- but a groupBy().withXX()
> > indicates that the withXX() belongs to the groupBy(). IMHO, this might
> > be quite confusion for developers.
> >
> >
> > -Matthias
> >
> > On 6/28/17 2:55 AM, Damian Guy wrote:
> > >> I also think that mixing optional parameters with configs is a bad
> idea.
> > >> Have not proposal for this atm but just wanted to mention it. Hope to
> > >> find some time to come up with something.
> > >>
> > >>
> > > Yes, i don't like the mix of config either. But the only real config
> here
> > > is the logging config - which we don't really need as it can already be
> > > done via a custom StateStoreSupplier.
> > >
> > >
> > >> What I don't like in the current proposal is the
> > >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
> > >> and .groupByKey() seems better. For clarity, we could rename to
> > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
> > >> some better names).
> > >>
> > >>
> > > it could be groupByKey(), groupBy() or something different bt
> > >
> > >
> > >
> > >> The proposed pattern "chains" grouping and aggregation too close
> > >> together. I would rather separate both more than less, ie, do into the
> > >> opposite direction.
> > >>
> > >> I am also wondering, if we could so something more "fluent". The
> initial
> > >> proposal was like:
> > >>
> > >>>> groupedStream.count()
> > >>>>    .withStoreName("name")
> > >>>>    .withCachingEnabled(false)
> > >>>>    .withLoggingEnabled(config)
> > >>>>    .table()
> > >>
> > >> The .table() statement in the end was kinda alien.
> > >>
> > >
> > > I agree, but then all of the withXXX methods need to be on KTable which
> > is
> > > worse in my opinion. You also need something that is going to "build"
> the
> > > internal processors and add them to the topology.
> > >
> > >
> > >> The current proposal put the count() into the end -- ie, the optional
> > >> parameter for count() have to specified on the .grouped() call -- this
> > >> does not seems to be the best way either.
> > >>
> > >>
> > > I actually prefer this method as you are building a grouped stream that
> > you
> > > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..)
> > etc
> > > seems natural to me.
> > >
> > >
> > >> I did not think this through in detail, but can't we just do the
> initial
> > >> proposal with the .table() ?
> > >>
> > >> groupedStream.count().withStoreName("name").mapValues(...)
> > >>
> > >> Each .withXXX(...) return the current KTable and all the .withXXX()
> are
> > >> just added to the KTable interface. Or do I miss anything why this
> wont'
> > >> work or any obvious disadvantage?
> > >>
> > >>
> > >>
> > > See above.
> > >
> > >
> > >>
> > >> -Matthias
> > >>
> > >> On 6/22/17 4:06 AM, Damian Guy wrote:
> > >>> Thanks everyone. My latest attempt is below. It builds on the fluent
> > >>> approach, but i think it is slightly nicer.
> > >>> I agree with some of what Eno said about mixing configy stuff in the
> > DSL,
> > >>> but i think that enabling caching and enabling logging are things
> that
> > >>> aren't actually config. I'd probably not add withLogConfig(...) (even
> > >>> though it is below) as this is actually config and we already have a
> > way
> > >> of
> > >>> doing that, via the StateStoreSupplier. Arguably we could use the
> > >>> StateStoreSupplier for disabling caching etc, but as it stands that
> is
> > a
> > >>> bit of a tedious process for someone that just wants to use the
> default
> > >>> storage engine, but not have caching enabled.
> > >>>
> > >>> There is also an orthogonal concern that Guozhang alluded to.... If
> you
> > >>> want to plug in a custom storage engine and you want it to be logged
> > etc,
> > >>> you would currently need to implement that yourself. Ideally we can
> > >> provide
> > >>> a way where we will wrap the custom store with logging, metrics,
> etc. I
> > >>> need to think about where this fits, it is probably more appropriate
> on
> > >> the
> > >>> Stores API.
> > >>>
> > >>> final KeyValueMapper<String, String, Long> keyMapper = null;
> > >>> // count with mapped key
> > >>> final KTable<Long, Long> count = stream.grouped()
> > >>>         .withKeyMapper(keyMapper)
> > >>>         .withKeySerde(Serdes.Long())
> > >>>         .withValueSerde(Serdes.String())
> > >>>         .withQueryableName("my-store")
> > >>>         .count();
> > >>>
> > >>> // windowed count
> > >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped()
> > >>>         .withQueryableName("my-window-store")
> > >>>         .windowed(TimeWindows.of(10L).until(10))
> > >>>         .count();
> > >>>
> > >>> // windowed reduce
> > >>> final Reducer<String> windowedReducer = null;
> > >>> final KTable<Windowed<String>, String> windowedReduce =
> > stream.grouped()
> > >>>         .withQueryableName("my-window-store")
> > >>>         .windowed(TimeWindows.of(10L).until(10))
> > >>>         .reduce(windowedReducer);
> > >>>
> > >>> final Aggregator<String, String, Long> aggregator = null;
> > >>> final Initializer<Long> init = null;
> > >>>
> > >>> // aggregate
> > >>> final KTable<String, Long> aggregate = stream.grouped()
> > >>>         .withQueryableName("my-aggregate-store")
> > >>>         .aggregate(aggregator, init, Serdes.Long());
> > >>>
> > >>> final StateStoreSupplier<KeyValueStore<String, Long>>
> > stateStoreSupplier
> > >> = null;
> > >>>
> > >>> // aggregate with custom store
> > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
> > >>>         .withStateStoreSupplier(stateStoreSupplier)
> > >>>         .aggregate(aggregator, init);
> > >>>
> > >>> // disable caching
> > >>> stream.grouped()
> > >>>         .withQueryableName("name")
> > >>>         .withCachingEnabled(false)
> > >>>         .count();
> > >>>
> > >>> // disable logging
> > >>> stream.grouped()
> > >>>         .withQueryableName("q")
> > >>>         .withLoggingEnabled(false)
> > >>>         .count();
> > >>>
> > >>> // override log config
> > >>> final Reducer<String> reducer = null;
> > >>> stream.grouped()
> > >>>         .withLogConfig(Collections.singletonMap("segment.size",
> "10"))
> > >>>         .reduce(reducer);
> > >>>
> > >>>
> > >>> If anyone wants to play around with this you can find the code here:
> > >>> https://github.com/dguy/kafka/tree/dsl-experiment
> > >>>
> > >>> Note: It won't actually work as most of the methods just return null.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>>
> > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:
> > >>>
> > >>>> Thanks Damian. I think both options have pros and cons. And both are
> > >> better
> > >>>> than overload abuse.
> > >>>>
> > >>>> The fluent API approach reads better, no mention of builder or build
> > >>>> anywhere. The main downside is that the method signatures are a
> little
> > >> less
> > >>>> clear. By reading the method signature, one doesn't necessarily
> knows
> > >> what
> > >>>> it returns. Also, one needs to figure out the special method
> > (`table()`
> > >> in
> > >>>> this case) that gives you what you actually care about (`KTable` in
> > this
> > >>>> case). Not major issues, but worth mentioning while doing the
> > >> comparison.
> > >>>>
> > >>>> The builder approach avoids the issues mentioned above, but it
> doesn't
> > >> read
> > >>>> as well.
> > >>>>
> > >>>> Ismael
> > >>>>
> > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> I'd like to get a discussion going around some of the API choices
> > we've
> > >>>>> made in the DLS. In particular those that relate to stateful
> > operations
> > >>>>> (though this could expand).
> > >>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
> > >> there
> > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
> > and
> > >> i
> > >>>>> feel it is only going to get worse as we add more optional params.
> In
> > >>>>> particular we've had some requests to be able to turn caching off,
> or
> > >>>>> change log configs,  on a per operator basis (note this can be done
> > now
> > >>>> if
> > >>>>> you pass in a StateStoreSupplier, but this can be a bit
> cumbersome).
> > >>>>>
> > >>>>> So this is a bit of an open question. How can we change the DSL
> > >> overloads
> > >>>>> so that it flows, is simple to use and understand, and is easily
> > >> extended
> > >>>>> in the future?
> > >>>>>
> > >>>>> One option would be to use a fluent API approach for providing the
> > >>>> optional
> > >>>>> params, so something like this:
> > >>>>>
> > >>>>> groupedStream.count()
> > >>>>>    .withStoreName("name")
> > >>>>>    .withCachingEnabled(false)
> > >>>>>    .withLoggingEnabled(config)
> > >>>>>    .table()
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Another option would be to provide a Builder to the count method,
> so
> > it
> > >>>>> would look something like this:
> > >>>>> groupedStream.count(new
> > >>>>> CountBuilder("storeName").withCachingEnabled(false).build())
> > >>>>>
> > >>>>> Another option is to say: Hey we don't need this, what are you on
> > >> about!
> > >>>>>
> > >>>>> The above has focussed on state store related overloads, but the
> same
> > >>>> ideas
> > >>>>> could  be applied to joins etc, where we presently have many join
> > >> methods
> > >>>>> and many overloads.
> > >>>>>
> > >>>>> Anyway, i look forward to hearing your opinions.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Damian
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>
>

Reply via email to