
I agree with some of Jan's points here. Interactive queries are a nice to have, 
but not worth sacrificing clean interfaces over.
It's not the main use case of Kafka Streams and implementing it via a 
getQueryHandle on KTables means the related logic doesn't spread everywhere but 
instead truly remains optional.

From: Jan Filipiak <jan.filip...@trivago.com>
Sent: 18 July 2017 16:21
To: Guozhang Wang; users@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring


Sorry for the delay, couldn't get to answer more early. I do understand
your point perfectly.
I just have a different perspective on what is going on. The most
crucial piece of abstraction, the KTable is falling apart
and that materializes (no pun intended) itself into many problems.

1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)

2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))

3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.

So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.
This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer<V1,V2> that
would serialize both upstream values for transport across boxes.

This first step would kill all the "Storename" based overloads + many
Statestore overloads. It would also avoid the bloated copy pasting in
each KTableProcessor for maintaining the store.
It would also make the runtime more efficient in a way that it does not
store the same data twice, just for accessing from IQ. Tackeling problem
1 but also all other three problems mentioned above.

 From here ~3 or 4 (from kstream,topic or aggregate) methods would still
be stuck with StateStoresupplier overload. For me, this is quite an
improvement already, to reduce further overloads I am thinking
to put a nullable properties to this operations. If people want to use
all defaults they could throw in null and it wouldn't be to painfull.
That doesn't necessarily require
them to have config files laying around. They could if they wanted use
property files to create such properties + we would over to look for
configs in the streams property.
So the complexity of distributing property files is optional and the
user might choose to fill the configs by code or files.

I think these steps can rescue the proper abstraction of a KTable. I
believe that with the current proposals we are only sugarcoating problem
1 and end up with a broken idea of what KTable is.
I think it will be even harder to develop further from there. Interface
wise my proposal is like developing backwards as i am very certain we
did a wrong turn with the IQ we shouldn't try to carry through.

I hope I could explain how this re factoring can tackle  the 3 above
problems and especially why i don't think we can win tackiling only
point 1 in the long run.
If anything would need an implementation draft please feel free to ask
me to provide one. Initially the proposal hopefully would get the job
done of just removing clutter.

Looking forward to your comments.

Best Jan

On 12.07.2017 21:27, Guozhang Wang wrote:
> Hello Jan,
> Thanks for your feedbacks. Let me try to clarify a few things with the
> problems that we are trying to resolve and the motivations with the
> current proposals.
> As Matthias mentioned, one issue that we are trying to tackle is to
> reduce the number of overloaded functions in the DSL due to serde
> overridden / state store supplier overridden that are needed for
> repartition, or for state store materializations. Another related
> issue is that the current overridden state store supplier is not very
> natural to use, for example:
> 1) If a user just want to disable caching / logging etc but do not
> want to change the underlying store engine at all, she needs to learn
> to know that, for example, if a windowed store or key-value store is
> needed for this specific operator in the DSL, what serdes are needed
> for materialize the store, in order to create a StateStoreSupplier
> with caching / logging disabled, and then pass into the DSL.
> 2) Similarly, if a user just want to set different topic configs for
> the changelog topic, she still need to specify the whole
> StateStoreSupplier into the operator.
> 3) If a user want to use a different store engine (e.g. MyStore than
> RocksDBStore) underneath but do not care about the default settings
> for logging, caching, etc, he STILL needs to pass in the whole
> StateStoreSupplier into the operator.
> Note that all the above scenarios are for advanced users who do want
> to override these settings, for users who are just OK with the default
> settings they should be not exposed with such APIs at all, like you
> said, "I do not be exposed with any of such implementation details",
> if you do not care.
> -----------------
> We have been talking about the configs v.s. code for such settings,
> since we have been using configs for "global" default configs; but the
> arguments against using configs for such per-operator / per-store
> settings as well is that it will simply make configs hard to manage /
> hard to wire with tools. Personally speaking, I'm not a big fan of
> using configs for per-entity overrides and that is mainly from my
> experience with Samza:Samza inherits exactly the same approach for
> per-stream / per-source configs:
> http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html
> <http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html>
>  ([system-name][stream-id]
> etc are all place-holders)
> The main issues were 1) users making config changes need to deploy
> this to all the instances, I think for Streams it would be even worse
> as we need to make a config file on each of the running instance, and
> whenever there is a change we need to make sure they are propagated to
> all of them, 2) whenever users make some code changes, e.g. to add a
> new stream / system, they need to remember to set the corresponding
> changes in the config files as well and they kept forgetting about it,
> the lesson learned there was that it is always better to change one
> place (code change) than two (code change + config file change).
> Again, this is not saying we have vetoed this option, and if people
> have good reasons for this let's discuss them here.
> -----------------
> So the current proposals are mainly around keeping configs for the
> global default settings, while still allowing users to override
> per-operator / per-store settings in the code, while also keeping in
> mind to not forced users to think about such implementation details if
> they are fine with whatever the default settings. For example:
> As a normal user it is sufficient to specify an aggregation as
> ```
> table4.join(table5, joiner).table();
> ```
> in which she can still just focus on the computational logic with all
> implementation details abstracted away; only if the user are capable
> enough with the implementation details (e.g. how is the joining tables
> be materialized into state stores, etc) and want to specify her own
> settings (e.g. I want to swap in my own state store engine, or I want
> to disable caching for dedup, or use a different serde etc) she can
> "explore" them with the DSL again:
> ```
> table4.join(table5, joiner).table(Materialized.as("store1")); // use a
> custom store name for interactive query
> table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier));
> // use a custom store engine
> table4.join(table5,
> joiner).table(Materialized.as("store1").withLoggingEnabled(configs));
> // use a custom store changelog topic configs
> // ... more
> ```
> Hope it helps.
> Guozhang
> On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com
> <mailto:jan.filip...@trivago.com>> wrote:
>     It makes me want to cry.
>     why on earth is the DSL going to expose all its implementation
>     details now?
>     especially being materialized or not.
>     If we want to take usefull steps in that direction maybe we are
>     looking for a way to let the user switch back and forth between
>     PAPI and DSL?
>     A change as the proposed would not eliminate any of my pain points
>     while still being a heck of work migrating towards to.
>     Since I am only following this from the point where Eno CC'ed it
>     into the users list:
>     Can someone please rephrase for me what problem this is trying to
>     solve? I don't mean to be rude but It uses a problematic feature
>     "StateStoreSuppliers in DSL" to justify making it even worse. This
>     helps us nowhere in making the configs more flexible, its just
>     syntactic sugar.
>     A low effort shoot like: lets add a properties to operations that
>     would otherwise become overloaded to heavy? Or pull the configs by
>     some naming schema
>     form the overall properties. Additionally to that we get rid of
>     StateStoreSuppliers in the DSL and have them also configured by
>     said properties.
>     => way easier to migrate to, way less risk, way more flexible in
>     the future (different implementations of the same operation don't
>     require code change to configure)
>     Line 184 makes especially no sense to me. what is a KTableKTable
>     non materialized join anyways?
>     Hope we can discuss more on this.
>     On 07.07.2017 17:23, Guozhang Wang wrote:
>         I messed the indentation on github code repos; this would be
>         easier to read:
>         https://codeshare.io/GLWW8K
>         Guozhang
>         On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang
>         <wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote:
>             Hi Damian / Kyle,
>             I think I agree with you guys about the pros / cons of
>             using the builder
>             pattern v.s. using some "secondary classes". And I'm
>             thinking if we can
>             take a "mid" manner between these two. I spent some time
>             with a slight
>             different approach from Damian's current proposal:
> https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
> <https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/>
>             java/org/apache/kafka/streams/RefactoredAPIs.java
>             The key idea is to tolerate the final "table()" or
>             "stream()" function to
>             "upgrade" from the secondary classes to the first citizen
>             classes, while
>             having all the specs inside this function. Also this
>             proposal includes some
>             other refactoring that people have been discussed about
>             for the builder to
>             reduce the overloaded functions as well. WDYT?
>             Guozhang
>             On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy
>             <damian....@gmail.com <mailto:damian....@gmail.com>> wrote:
>                 Hi Jan,
>                 Thanks very much for the input.
>                 On Tue, 4 Jul 2017 at 08:54 Jan Filipiak
>                 <jan.filip...@trivago.com
>                 <mailto:jan.filip...@trivago.com>>
>                 wrote:
>                     Hi Damian,
>                     I do see your point of something needs to change.
>                     But I fully agree with
>                     Gouzhang when he says.
>                     ---
>                     But since this is a incompatibility change, and we
>                     are going to remove
>                 the
>                     compatibility annotations soon it means we only
>                     have one chance and we
>                     really have to make it right.
>                     ----
>                 I think we all agree on this one! Hence the discussion.
>                     I fear all suggestions do not go far enough to
>                     become something that
>                 will
>                     carry on for very much longer.
>                     I am currently working on KAFKA-3705 and try to
>                     find the most easy way
>                 for
>                     the user to give me all the required
>                     functionality. The easiest
>                 interface I
>                     could come up so far can be looked at here.
> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
> <https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
>                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>                 kafka/streams/kstream/internals/KTableImpl.java#L622
>                 And its already horribly complicated. I am currently
>                 unable to find the
>                     right abstraction level to have everything falling
>                     into place
>                 naturally. To
>                     be honest I already think introducing
>                 To be fair that is not a particularly easy problem to
>                 solve!
> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
> <https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
>                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>                 kafka/streams/kstream/internals/KTableImpl.java#L493
>                     was unideal and makes everything a mess.
>                 I'm not sure i agree that it makes everything a mess,
>                 but It could have
>                 been done differently.
>                 The JoinType:Whatever is also not really flexible. 2
>                 things come to my
>                 mind:
>                     1. I don't think we should rule out config based
>                     decisions say configs
>                 like
>                      streams.$applicationID.joins.$joinname.conf = value
>                 Is this just for config? Or are you suggesting that we
>                 could somehow
>                 "code"
>                 the join in a config file?
>                     This can allow for tremendous changes without
>                     single API change and IMO
>                 it
>                     was not considered enough yet.
>                     2. Push logic from the DSL to the Callback
>                     classes. A ValueJoiner for
>                     example can be used to implement different join
>                     types as the user
>                 wishes.
>                 Do you have an example of how this might look?
>                     As Gouzhang said: stopping to break users is very
>                     important.
>                 Of course. We want to make it as easy as possible for
>                 people to use
>                 streams.
>                 especially with this changes + All the plans I sadly
>                 only have in my head
>                     but hopefully the first link can give a glimpse.
>                     Thanks for preparing the examples made it way
>                     clearer to me what exactly
>                     we are talking about. I would argue to go a bit
>                     slower and more
>                 carefull on
>                     this one. At some point we need to get it right.
>                     Peeking over to the
>                 hadoop
>                     guys with their hughe userbase. Config files
>                     really work well for them.
>                     Best Jan
>                     On 30.06.2017 09:31, Damian Guy wrote:
>                         Thanks Matthias
>                         On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
>                         <matth...@confluent.io
>                         <mailto:matth...@confluent.io>>
>                     wrote:
>                             I am just catching up on this thread, so
>                             sorry for the long email in
>                             advance... Also, it's to some extend a
>                             dump of thoughts and not
>                 always a
>                             clear proposal. Still need to think about
>                             this in more detail. But
>                 maybe
>                             it helps other to get new ideas :)
>                                     However, I don't understand your
>                                     argument about putting aggregate()
>                                     after the withXX() -- all the
>                                     calls to withXX() set optional
>                     parameters
>                                     for aggregate() and not for
>                                     groupBy() -- but a groupBy().withXX()
>                                     indicates that the withXX()
>                                     belongs to the groupBy(). IMHO, this
>                 might
>                                     be quite confusion for developers.
>                                 I see what you are saying, but the
>                                 grouped stream is effectively a
>                     no-op
>                                 until you call one of the
>                                 aggregate/count/reduce etc functions. So
>                 the
>                                 optional params are ones that are
>                                 applicable to any of the
>                 operations
>                     you
>                                 can perform on this grouped stream.
>                                 Then the final
>                                 count()/reduce()/aggregate() call has
>                                 any of the params that are
>                                 required/specific to that function.
>                             I understand your argument, but you don't
>                             share the conclusion. If we
>                             need a "final/terminal" call, the better
>                             way might be
>                             .groupBy().count().withXX().build()
>                             (with a better name for build() though)
>                         The point is that all the other calls,
>                         i.e,withBlah, windowed, etc
>                 apply
>                         too all the aggregate functions. The terminal
>                         call being the actual
>                 type
>                     of
>                         aggregation you want to do. I personally find
>                         this more natural than
>                         groupBy().count().withBlah().build()
>                                 groupedStream.count(/** non windowed
>                                 count**/)
> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>                             I like this. However, I don't see a reason
>                             to have windowed() and
>                             sessionWindowed(). We should have one
>                             top-level `Windows` interface
>                 that
>                             both `TimeWindows` and `SessionWindows`
>                             implement and just have a
>                 single
>                             windowed() method that accepts all
>                             `Windows`. (I did not like the
>                             separation of `SessionWindows` in the
>                             first place, and this seems to
>                 be
>                             an opportunity to clean this up. It was
>                             hard to change when we
>                             introduced session windows)
>                         Yes - true we should look into that.
>                             Btw: we do you the imperative groupBy()
>                             and groupByKey(), and thus we
>                             might also want to use windowBy() (instead
>                             of windowed()). Not sure
>                 how
>                             important this is, but it seems to be
>                             inconsistent otherwise.
>                         Makes sense
>                             About joins:  I don't like
>                             .withJoinType(JoinType.LEFT) at all. I
>                 think,
>                             defining an inner/left/outer join is not
>                             an optional argument but a
>                             first class concept and should have a
>                             proper representation in the
>                 API
>                             (like the current methods join(),
>                             leftJoin, outerJoin()).
>                         Yep, i did originally have it as a required
>                         param and maybe that is
>                 what
>                     we
>                         go with. It could have a default, but maybe
>                         that is confusing.
>                             About the two join API proposals, the
>                             second one has too much boiler
>                             plate code for my taste. Also, the actual
>                             join() operator has only
>                 one
>                             argument what is weird to me, as in my
>                             thinking process, the main
>                             operator call, should have one parameter
>                             per mandatory argument but
>                 your
>                             proposal put the mandatory arguments into
>                             Joins.streamStreamJoin()
>                 call.
>                             This is far from intuitive IMHO.
>                         This is the builder pattern, you only need one
>                         param as the builder
>                 has
>                         captured all of the required and optional
>                         arguments.
>                             The first join proposal also seems to
>                             align better with the pattern
>                             suggested for aggregations and having the
>                             same pattern for all
>                 operators
>                             is important (as you stated already).
>                         This is why i offered two alternatives as i
>                         started out with. 1 is the
>                         builder pattern, the other is the more fluent
>                         pattern.
>                             Coming back to the config vs optional
>                             parameter. What about having a
>                             method withConfig[s](...) that allow to
>                             put in the configuration?
>                         Sure, it is currently called withLogConfig()
>                         as that is the only thing
>                     that
>                         is really config.
>                             This also raises the question if until()
>                             is a windows property?
>                             Actually, until() seems to be a
>                             configuration parameter and thus,
>                 should
>                             not not have it's own method.
>                         Hmmm, i don't agree. Until is a property of
>                         the window. It is going
>                 to be
>                         potentially different for every window
>                         operation you do in a streams
>                 app.
>                             Browsing throw your example DSL branch, I
>                             also saw this one:
>                                 final KTable<Windowed<String>, Long>
>                                 windowed>
>                                groupedStream.counting()
>                                  .windowed(TimeWindows.of(10L).until(10))
>                                                    .table();
>                             This is an interesting idea, and it remind
>                             my on some feedback about
>                 "I
>                             wanted to count a stream, but there was no
>                             count() method -- I first
>                             needed to figure out, that I need to group
>                             the stream first to be
>                 able
>                             to count it. It does make sense in
>                             hindsight but was not obvious in
>                 the
>                             beginning". Thus, carrying out this
>                             thought, we could also do the
>                             following:
>                             stream.count().groupedBy().windowedBy().table();
>                             -> Note, I use "grouped" and "windowed"
>                             instead of imperative here,
>                 as
>                             it comes after the count()
>                             This would be more consistent than your
>                             proposal (that has grouping
>                             before but windowing after count()). It
>                             might even allow us to enrich
>                             the API with a some syntactic sugar like
>                             `stream.count().table()` to
>                 get
>                             the overall count of all records (this
>                             would obviously not scale,
>                 but we
>                             could support it -- if not now, maybe later).
>                         I guess i'd prefer
>                         stream.groupBy().windowBy().count()
>                         stream.groupBy().windowBy().reduce()
>                         stream.groupBy().count()
>                         As i said above, everything that happens
>                         before the final aggregate
>                 call
>                         can be applied to any of them. So it makes
>                         sense to me to do those
>                 things
>                         ahead of the final aggregate call.
>                             Last about builder pattern. I am convinced
>                             that we need some
>                 "terminal"
>                             operator/method that tells us when to add
>                             the processor to the
>                 topology.
>                             But I don't see the need for a plain
>                             builder pattern that feels
>                 alien to
>                             me (see my argument about the second join
>                             proposal). Using .stream()
>                 /
>                             .table() as use in many examples might
>                             work. But maybe a more generic
>                             name that we can use in all places like
>                             build() or apply() might
>                 also be
>                             an option.
>                         Sure, a generic name might be ok.
>                             -Matthias
>                             On 6/29/17 7:37 AM, Damian Guy wrote:
>                                 Thanks Kyle.
>                                 On Thu, 29 Jun 2017 at 15:11 Kyle
>                                 Winkelman <
>                 winkelman.k...@gmail.com
>                 <mailto:winkelman.k...@gmail.com>>
>                                 wrote:
>                                     Hi Damian,
>                                                     When trying to
>                                                     program in the
>                                                     fluent API that
>                                                     has been
>                 discussed
>                             most
>                                     it
>                                                     feels difficult to
>                                                     know when you will
>                                                     actually get an object
>                 you
>                     can
>                                     reuse.
>                                                     What if I make one
>                                                     KGroupedStream
>                                                     that I want to
>                                                     reuse, is it
>                     legal
>                             to
>                                                     reuse it or does
>                                                     this approach
>                                                     expect you to call
>                                                     grouped each
>                     time?
>                                             I'd anticipate that once
>                                             you have a KGroupedStream
>                                             you can
>                 re-use it
>                             as
>                                     you
>                                             can today.
>                                     You said it yourself in another
>                                     post that the grouped stream is
>                                     effectively a no-op until a count,
>                                     reduce, or aggregate. The way I
>                 see
>                             it
>                                     you wouldn't be able to reuse
>                                     anything except KStreams and KTables,
>                             because
>                                     most of this fluent api would
>                                     continue returning this (this being
>                 the
>                                     builder object currently being
>                                     manipulated).
>                                 So, if you ever store a reference to
>                                 anything but KStreams and
>                 KTables
>                             and
>                                     you use it in two different ways
>                                     then its possible you make
>                     conflicting
>                                     withXXX() calls on the same builder.
>                                 No necessarily true. It could return a
>                                 new instance of the builder,
>                     i.e.,
>                                 the builders being immutable. So if
>                                 you held a reference to the
>                 builder
>                             it
>                                 would always be the same as it was
>                                 when it was created.
>                                     GroupedStream<K,V>
>                                     groupedStreamWithDefaultSerdes =
>                 kStream.grouped();
>                                     GroupedStream<K,V>
>                                     groupedStreamWithDeclaredSerdes =
> groupedStreamsWithDefaultSerdes.withKeySerde(.).withValueSerde(.);
>                                     I'll admit that this shouldn't
>                                     happen but some user is going to do
>                 it
>                                     eventually.
>                                     Depending on implementation uses
>                                     of groupedStreamWithDefaultSerdes
>                     would
>                                     most likely be equivalent to the
>                                     version withDeclaredSerdes. One
>                 work
>                                     around would be to always make
>                                     copies of the config objects you are
>                                     building, but this approach has
>                                     its own problem because now we
>                 have to
>                                     identify which configs are
>                                     equivalent so we don't create repeated
>                                     processors.
>                                     The point of this long winded
>                                     example is that we always have to be
>                                     thinking about all of the possible
>                                     ways it could be misused by a
>                 user
>                                     (causing them to see hard to
>                                     diagnose problems).
>                                 Exactly! That is the point of the
>                                 discussion really.
>                                     In my attempt at a couple methods
>                                     with builders I feel that I could
>                                     confidently say the user couldn't
>                                     really mess it up.
>                                         // Count
>                                         KTable<String, Long> count =
> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
>                                     The kGroupedStream is reusable and
>                                     if they attempted to reuse the
>                     Count
>                                     for some reason it would throw an
>                                     error message saying that a store
>                             named
>                                     "my-store" already exists.
>                                 Yes i agree and i think using builders
>                                 is my preferred pattern.
>                                 Cheers,
>                                 Damian
>                                     Thanks,
>                                     Kyle
>                                     From: Damian Guy
>                                     Sent: Thursday, June 29, 2017 3:59 AM
>                                     To: d...@kafka.apache.org
>                                     <mailto:d...@kafka.apache.org>
>                                     Subject: Re: [DISCUSS] Streams
>                                     DSL/StateStore Refactoring
>                                     Hi Kyle,
>                                     Thanks for your input. Really
>                                     appreciated.
>                                     On Thu, 29 Jun 2017 at 06:09 Kyle
>                                     Winkelman <
>                 winkelman.k...@gmail.com <mailto:winkelman.k...@gmail.com>
>                                     wrote:
>                                         I like more of a builder
>                                         pattern even though others
>                                         have voiced
>                     against
>                                         it. The reason I like it is
>                                         because it makes it clear to
>                                         the user
>                     that
>                             a
>                                         call to KGroupedStream#count
>                                         will return a KTable not some
>                     intermediate
>                                         class that I need to undetstand.
>                                     Yes, that makes sense.
>                                         When trying to program in the
>                                         fluent API that has been discussed
>                 most
>                             it
>                                         feels difficult to know when
>                                         you will actually get an
>                                         object you
>                 can
>                                     reuse.
>                                         What if I make one
>                                         KGroupedStream that I want to
>                                         reuse, is it
>                 legal
>                     to
>                                         reuse it or does this approach
>                                         expect you to call grouped each
>                 time?
>                                     I'd anticipate that once you have
>                                     a KGroupedStream you can re-use
>                 it
>                     as
>                             you
>                                     can today.
>                                         This question doesn't pop into
>                                         my head at all in the builder
>                 pattern
>                     I
>                                         assume I can reuse everything.
>                                         Finally, I like .groupByKey
>                                         and .groupBy(KeyValueMapper)
>                                         not a big
>                     fan
>                             of
>                                         the grouped.
>                                         Yes, grouped() was more for
>                                         demonstration and because
>                                         groupBy()
>                 and
>                                     groupByKey() were taken! So i'd
>                                     imagine the api would actually
>                 want to
>                             be
>                                     groupByKey(/** no required
>                                     args***/).withOptionalArg() and
>                                     groupBy(KeyValueMapper
>                                     m).withOpitionalArg(...)  of
>                                     course this all
>                             depends
>                                     on maintaining backward compatibility.
>                                         Unfortunately, the below
>                                         approach would require atleast 2
>                 (probably
>                     3)
>                                         overloads (one for returning a
>                                         KTable and one for returning a
>                 KTable
>                             with
>                                         Windowed Key, probably would
>                                         want to split windowed and
>                     sessionwindowed
>                                     for
>                                         ease of implementation) of
>                                         each count, reduce, and aggregate.
>                                         Obviously not exhaustive but
>                                         enough for you to get the picture.
>                     Count,
>                                         Reduce, and Aggregate supply 3
>                                         static methods to initialize the
>                             builder:
>                                         // Count
>                                         KTable<String, Long> count =
> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>                                         // Windowed Count
>                                         KTable<Windowed<String>, Long>
>                                         windowedCount =
> groupedStream.count(Count.windowed(TimeWindows.of(10L).until
>                 (10)).withQueryableStoreName("my-windowed-store"));
>                                         // Session Count
>                                         KTable<Windowed<String>, Long>
>                                         sessionCount =
>                     groupedStream.count(Count.sessionWindowed(SessionWindows.
> with(10L)).withQueryableStoreName("my-session-windowed-store"));
>                                     Above and below, i think i'd
>                                     prefer it to be:
>                                     groupedStream.count(/** non
>                                     windowed count**/)
> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>                                         // Reduce
>                                         Reducer<Long> reducer;
>                                         KTable<String, Long> reduce =
>                                         groupedStream.reduce(reducer,
> Reduce.reduce().withQueryableStoreName("my-store"));
>                                         // Aggregate Windowed with
>                                         Custom Store
>                                         Initializer<String> initializer;
>                                         Aggregator<String, Long,
>                                         String> aggregator;
>                                         KTable<Windowed<String>,
>                                         String> aggregate =
>                                         groupedStream.aggregate(initializer,
>                                         aggregator,
>                     Aggregate.windowed(TimeWindows.of(10L).until(10)).
>                 withStateStoreSupplier(stateStoreSupplier)));
>                                         // Cogroup SessionWindowed
>                                         KTable<String, String> cogrouped =
>                     groupedStream1.cogroup(aggregator1)
>                                         .cogroup(groupedStream2,
>                                         aggregator2)
>                                         .aggregate(initializer,
>                                         aggregator,
> Aggregate.sessionWindowed(SessionWindows.with(10L),
> sessionMerger).withQueryableStoreName("my-store"));
>                                         public class Count {
>                                               public static class
>                                         Windowed extends Count {
>                                                   private Windows windows;
>                                               }
>                                               public static class
>                                         SessionWindowed extends Count {
>                                                   private
>                                         SessionWindows sessionWindows;
>                                               }
>                                               public static Count count();
>                                               public static Windowed
>                                         windowed(Windows windows);
>                                               public static
>                                         SessionWindowed
>                                         sessionWindowed(SessionWindows
>                                         sessionWindows);
>                                               // All withXXX(...) methods.
>                                         }
>                                         public class KGroupedStream {
>                                               public KTable<K, Long>
>                                         count(Count count);
>                                               public
>                                         KTable<Windowed<K>, Long>
>                                         count(Count.Windowed count);
>                                               public
>                                         KTable<Windowed<K>, Long>
>                                         count(Count.SessionWindowed
>                             count);
>                                         .
>                                         }
>                                         Thanks,
>                                         Kyle
>                                         From: Guozhang Wang
>                                         Sent: Wednesday, June 28, 2017
>                                         7:45 PM
>                                         To: d...@kafka.apache.org
>                                         <mailto:d...@kafka.apache.org>
>                                         Subject: Re: [DISCUSS] Streams
>                                         DSL/StateStore Refactoring
>                                         I played the current proposal
>                                         a bit with
>                             https://github.com/dguy/kafka/
>                                         tree/dsl-experiment <
>                     https://github.com/dguy/kafka/tree/dsl-experiment
>                     <https://github.com/dguy/kafka/tree/dsl-experiment>
>                                 ,
>                                         and here are my observations:
>                                         1. Personally I prefer
>                                               "stream.group(mapper) /
>                                         stream.groupByKey()"
>                                         than
>                                         "stream.group().withKeyMapper(mapper)
>                                         / stream.group()"
>                                         Since 1) withKeyMapper is not
>                                         enforced programmatically
>                                         though it
>                 is
>                             not
>                                         "really" optional like others,
>                                         2) syntax-wise it reads more
>                 natural.
>                                         I think it is okay to add the
>                                         APIs in (
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr
> <https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr>
> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
>                                         )
>                                         in KGroupedStream.
>                                         2. For the
>                                         "withStateStoreSupplier" API,
>                                         are the user supposed to
>                     pass
>                             in
>                                         the most-inner state store
>                                         supplier (e.g. then one whose
>                                         get()
>                 return
>                                         RocksDBStore), or it is
>                                         supposed to return the
>                                         most-outer supplier
>                     with
>                                         logging / metrics / etc? I
>                                         think it would be more useful
>                                         to only
>                             require
>                                         users pass in the inner state
>                                         store supplier while specifying
>                     caching /
>                                         logging through other APIs.
>                                         In addition, the
>                                         "GroupedWithCustomStore" is a
>                                         bit suspicious to
>                 me:
>                     we
>                                     are
>                                         allowing users to call other
>                                         APIs like "withQueryableName"
>                 multiple
>                             time,
>                                         but only call
>                                         "withStateStoreSupplier" only
>                                         once in the end. Why
>                 is
>                             that?
>                                         3. The current DSL seems to be
>                                         only for aggregations, what about
>                     joins?
>                                         4. I think it is okay to keep
>                                         the "withLogConfig": for the
>                                         StateStoreSupplier it will
>                                         still be user code specifying the
>                 topology
>                             so
>                                     I
>                                         do not see there is a big
>                                         difference.
>                                         5. "WindowedGroupedStream" 's
>                                         withStateStoreSupplier should take
>                 the
>                                         windowed state store supplier
>                                         to enforce typing?
>                                         Below are minor ones:
>                                         6. "withQueryableName": maybe
>                                         better "withQueryableStateName"?
>                                         7. "withLogConfig": maybe
>                                         better "withLoggingTopicConfig()"?
>                                         Guozhang
>                                         On Wed, Jun 28, 2017 at 3:59
>                                         PM, Matthias J. Sax <
>                             matth...@confluent.io
>                             <mailto:matth...@confluent.io>>
>                                         wrote:
>                                             I see your point about
>                                             "when to add the processor
>                                             to the
>                 topology".
>                                     That
>                                             is indeed an issue. Not
>                                             sure it we could allow
>                                             "updates" to the
>                                         topology...
>                                             I don't see any problem
>                                             with having all the
>                                             withXX() in KTable
>                                     interface
>                                             -- but this might be
>                                             subjective.
>                                             However, I don't
>                                             understand your argument
>                                             about putting
>                 aggregate()
>                                             after the withXX() -- all
>                                             the calls to withXX() set
>                                             optional
>                             parameters
>                                             for aggregate() and not
>                                             for groupBy() -- but a
>                                             groupBy().withXX()
>                                             indicates that the
>                                             withXX() belongs to the
>                                             groupBy(). IMHO, this
>                     might
>                                             be quite confusion for
>                                             developers.
>                                             -Matthias
>                                             On 6/28/17 2:55 AM, Damian
>                                             Guy wrote:
>                                                     I also think that
>                                                     mixing optional
>                                                     parameters with
>                                                     configs is a
>                 bad
>                                         idea.
>                                                     Have not proposal
>                                                     for this atm but
>                                                     just wanted to
>                                                     mention it.
>                 Hope
>                                     to
>                                                     find some time to
>                                                     come up with
>                                                     something.
>                                                 Yes, i don't like the
>                                                 mix of config either.
>                                                 But the only real
>                     config
>                                         here
>                                                 is the logging config
>                                                 - which we don't
>                                                 really need as it can
>                     already
>                                     be
>                                                 done via a custom
>                                                 StateStoreSupplier.
>                                                     What I don't like
>                                                     in the current
>                                                     proposal is the
>                                                     .grouped().withKeyMapper()
>                                                     -- the current
>                                                     solution with
>                                     .groupBy(...)
>                                                     and .groupByKey()
>                                                     seems better. For
>                                                     clarity, we could
>                                                     rename to
>                                                     .groupByNewKey(...) and
>                                                     .groupByCurrentKey()
>                                                     (even if we should
>                     find
>                                                     some better names).
>                                                 it could be
>                                                 groupByKey(),
>                                                 groupBy() or something
>                                                 different bt
>                                                     The proposed
>                                                     pattern "chains"
>                                                     grouping and
>                                                     aggregation too
>                 close
>                                                     together. I would
>                                                     rather separate
>                                                     both more than
>                                                     less, ie, do
>                 into
>                                     the
>                                                     opposite direction.
>                                                     I am also
>                                                     wondering, if we
>                                                     could so something
>                                                     more "fluent".
>                 The
>                                         initial
>                                                     proposal was like:
> groupedStream.count()
> .withStoreName("name")
> .withCachingEnabled(false)
> .withLoggingEnabled(config)
>                                                                  .table()
>                                                     The .table()
>                                                     statement in the
>                                                     end was kinda alien.
>                                                 I agree, but then all
>                                                 of the withXXX methods
>                                                 need to be on
>                 KTable
>                                     which
>                                             is
>                                                 worse in my opinion.
>                                                 You also need
>                                                 something that is going to
>                     "build"
>                                         the
>                                                 internal processors
>                                                 and add them to the
>                                                 topology.
>                                                     The current
>                                                     proposal put the
>                                                     count() into the
>                                                     end -- ie, the
>                                     optional
>                                                     parameter for
>                                                     count() have to
>                                                     specified on the
>                                                     .grouped() call
>                 --
>                                     this
>                                                     does not seems to
>                                                     be the best way
>                                                     either.
>                                                 I actually prefer this
>                                                 method as you are
>                                                 building a grouped
>                 stream
>                                     that
>     ...
>     [Message clipped]
> --
> -- Guozhang

Reply via email to