Hi Damian,

thanks for taking the time. I think you read my points individually but you seem to not understand the bigger picture I am trying to paint.

From the three problems I mentioned - and that you agreed on to be problems - you are only trying to address the first.

What I am trying to tell you is that if you focus on the later two the first one comes for free. On the other hand if you focus on the first and please allow me to call it the easy part. All you going to archive is to break user land and sugar coat the real problems.

This takes away overloads, but still leaves it a mess to implement new features. I am currently trying to prep a patch for Kafka-3705 and I do not understand why I should deal with Interactive Queries what so ever. My Output table has a proper ValueGetterSupplier.
That should be it!

I hope I made clear that to improve here quite some hard work has been done and that it would be rewariding and that just sugar coating everything
is one of the worst steps we could take from where we are at the moment.

Looking at Kafka-5581 that you mentioned. None of the points are really related to what I am saying really. Each of these points is tricky and
requires some carefull thinking but might work out.

Further Looking at you comment that refers to KIP vs. DISCUSS. I don't know what I should understand from that.

Regarding your comment mentioning that getQueryHandle() wouldn't work. Its the same thing as giving the user a queryable string. It works the same way with the only difference that we have a wrapper object that gives the user what he wants instantly! Instead of giving him a String to get a Store, we just give him a store, plus we don't hand out some inflexible native types that we later on don't have control over.
The whole logic about partitioners and what else does not change.

Hope this makes my points more clear.

Best Jan


On 19.07.2017 12:03, Damian Guy wrote:
Hi Jan,

Thanks for your input. Comments inline

On Tue, 18 Jul 2017 at 15:21 Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi,


1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)


As you state further down we are trying to address this.


2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))


Yes, i agree. That is related to the KTable queryable store etc, and can
easily be addressed, but isn't necessarily part of this as it doesn't need
to be a public interface change, i.e., we can clean that up in the
background.


3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.

Agreed. Again, this is not a public interface change. We don't need another
store, i.e., we can just use a "View" on the existing store, which is
basically just using the KTableValueGetter to apply the map or filter
operation to the original store. We also have this jira
https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
when we do and don't need to add additional changelogs.


So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.

We are not settling on the public API. We do, however need to do KIPs for
public API discussions. For internal changes we don't necessarily need to
have a public discussion about it.


This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer<V1,V2> that
would serialize both upstream values for transport across boxes.

This first step would kill all the "Storename" based overloads + many
Statestore overloads. It would also avoid the bloated copy pasting in
each KTableProcessor for maintaining the store.
It would also make the runtime more efficient in a way that it does not
store the same data twice, just for accessing from IQ. Tackeling problem
1 but also all other three problems mentioned above.

  From here ~3 or 4 (from kstream,topic or aggregate) methods would still
be stuck with StateStoresupplier overload. For me, this is quite an
improvement already, to reduce further overloads I am thinking
to put a nullable properties to this operations. If people want to use
all defaults they could throw in null and it wouldn't be to painfull.
That doesn't necessarily require
them to have config files laying around. They could if they wanted use
property files to create such properties + we would over to look for
configs in the streams property.
So the complexity of distributing property files is optional and the
user might choose to fill the configs by code or files.

I think these steps can rescue the proper abstraction of a KTable. I
believe that with the current proposals we are only sugarcoating problem
1 and end up with a broken idea of what KTable is.
I think it will be even harder to develop further from there. Interface
wise my proposal is like developing backwards as i am very certain we
did a wrong turn with the IQ we shouldn't try to carry through.

I hope I could explain how this re factoring can tackle  the 3 above
problems and especially why i don't think we can win tackiling only
point 1 in the long run.
If anything would need an implementation draft please feel free to ask
me to provide one. Initially the proposal hopefully would get the job
done of just removing clutter.


I agree with some of what you have said in the above few paragraphs. I
think you are correct in that KTable has become littered with a bunch of
methods to make each stage queryable, i.e, adding the overloads
for queryableStoreName and StateStoreSupplier. I think we can do away with
both of them as once you have a KTable you can always build a view of it by
using the KTableValueGetter. So we don't ever need a StateStoreSupplier as
we have one already from when the original KTable was created. We can also
possibly remove the overloads with queryableName and always use a generated
name that can be retrieved from the method `String queryableStoreName` -
this can then be used with IQ if needed.

The getQueryHandle idea you mention won't really work as things stand. The
KTable knows nothing about it's runtime context it is purely for building a
topology that can be executed. In order to successfully query a `KTable`
(state store) we need to know how many partitions and on which threads the
state stores are running. This is why we added the `stores` API to
`KafkaStreams` as this is the execution environment that has all of the
information.


Thanks,
Damian

Looking forward to your comments.
Best Jan



On 12.07.2017 21:27, Guozhang Wang wrote:
Hello Jan,

Thanks for your feedbacks. Let me try to clarify a few things with the
problems that we are trying to resolve and the motivations with the
current proposals.

As Matthias mentioned, one issue that we are trying to tackle is to
reduce the number of overloaded functions in the DSL due to serde
overridden / state store supplier overridden that are needed for
repartition, or for state store materializations. Another related
issue is that the current overridden state store supplier is not very
natural to use, for example:

1) If a user just want to disable caching / logging etc but do not
want to change the underlying store engine at all, she needs to learn
to know that, for example, if a windowed store or key-value store is
needed for this specific operator in the DSL, what serdes are needed
for materialize the store, in order to create a StateStoreSupplier
with caching / logging disabled, and then pass into the DSL.

2) Similarly, if a user just want to set different topic configs for
the changelog topic, she still need to specify the whole
StateStoreSupplier into the operator.

3) If a user want to use a different store engine (e.g. MyStore than
RocksDBStore) underneath but do not care about the default settings
for logging, caching, etc, he STILL needs to pass in the whole
StateStoreSupplier into the operator.

Note that all the above scenarios are for advanced users who do want
to override these settings, for users who are just OK with the default
settings they should be not exposed with such APIs at all, like you
said, "I do not be exposed with any of such implementation details",
if you do not care.

-----------------

We have been talking about the configs v.s. code for such settings,
since we have been using configs for "global" default configs; but the
arguments against using configs for such per-operator / per-store
settings as well is that it will simply make configs hard to manage /
hard to wire with tools. Personally speaking, I'm not a big fan of
using configs for per-entity overrides and that is mainly from my
experience with Samza:Samza inherits exactly the same approach for
per-stream / per-source configs:


http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html
<
http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html>
([system-name][stream-id]
etc are all place-holders)

The main issues were 1) users making config changes need to deploy
this to all the instances, I think for Streams it would be even worse
as we need to make a config file on each of the running instance, and
whenever there is a change we need to make sure they are propagated to
all of them, 2) whenever users make some code changes, e.g. to add a
new stream / system, they need to remember to set the corresponding
changes in the config files as well and they kept forgetting about it,
the lesson learned there was that it is always better to change one
place (code change) than two (code change + config file change).

Again, this is not saying we have vetoed this option, and if people
have good reasons for this let's discuss them here.

-----------------

So the current proposals are mainly around keeping configs for the
global default settings, while still allowing users to override
per-operator / per-store settings in the code, while also keeping in
mind to not forced users to think about such implementation details if
they are fine with whatever the default settings. For example:

As a normal user it is sufficient to specify an aggregation as

```
table4.join(table5, joiner).table();
```

in which she can still just focus on the computational logic with all
implementation details abstracted away; only if the user are capable
enough with the implementation details (e.g. how is the joining tables
be materialized into state stores, etc) and want to specify her own
settings (e.g. I want to swap in my own state store engine, or I want
to disable caching for dedup, or use a different serde etc) she can
"explore" them with the DSL again:

```
table4.join(table5, joiner).table(Materialized.as("store1")); // use a
custom store name for interactive query
table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier));
// use a custom store engine
table4.join(table5,
joiner).table(Materialized.as("store1").withLoggingEnabled(configs));
// use a custom store changelog topic configs
// ... more
```

Hope it helps.


Guozhang


On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com
<mailto:jan.filip...@trivago.com>> wrote:

     It makes me want to cry.

     why on earth is the DSL going to expose all its implementation
     details now?
     especially being materialized or not.

     If we want to take usefull steps in that direction maybe we are
     looking for a way to let the user switch back and forth between
     PAPI and DSL?

     A change as the proposed would not eliminate any of my pain points
     while still being a heck of work migrating towards to.

     Since I am only following this from the point where Eno CC'ed it
     into the users list:

     Can someone please rephrase for me what problem this is trying to
     solve? I don't mean to be rude but It uses a problematic feature
     "StateStoreSuppliers in DSL" to justify making it even worse. This
     helps us nowhere in making the configs more flexible, its just
     syntactic sugar.

     A low effort shoot like: lets add a properties to operations that
     would otherwise become overloaded to heavy? Or pull the configs by
     some naming schema
     form the overall properties. Additionally to that we get rid of
     StateStoreSuppliers in the DSL and have them also configured by
     said properties.

     => way easier to migrate to, way less risk, way more flexible in
     the future (different implementations of the same operation don't
     require code change to configure)

     Line 184 makes especially no sense to me. what is a KTableKTable
     non materialized join anyways?

     Hope we can discuss more on this.




     On 07.07.2017 17:23, Guozhang Wang wrote:

         I messed the indentation on github code repos; this would be
         easier to read:

         https://codeshare.io/GLWW8K


         Guozhang


         On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang
         <wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote:

             Hi Damian / Kyle,

             I think I agree with you guys about the pros / cons of
             using the builder
             pattern v.s. using some "secondary classes". And I'm
             thinking if we can
             take a "mid" manner between these two. I spent some time
             with a slight
             different approach from Damian's current proposal:


https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
             <
https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/>
             java/org/apache/kafka/streams/RefactoredAPIs.java

             The key idea is to tolerate the final "table()" or
             "stream()" function to
             "upgrade" from the secondary classes to the first citizen
             classes, while
             having all the specs inside this function. Also this
             proposal includes some
             other refactoring that people have been discussed about
             for the builder to
             reduce the overloaded functions as well. WDYT?


             Guozhang


             On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy
             <damian....@gmail.com <mailto:damian....@gmail.com>> wrote:

                 Hi Jan,

                 Thanks very much for the input.

                 On Tue, 4 Jul 2017 at 08:54 Jan Filipiak
                 <jan.filip...@trivago.com
                 <mailto:jan.filip...@trivago.com>>
                 wrote:

                     Hi Damian,

                     I do see your point of something needs to change.
                     But I fully agree with
                     Gouzhang when he says.
                     ---

                     But since this is a incompatibility change, and we
                     are going to remove

                 the

                     compatibility annotations soon it means we only
                     have one chance and we
                     really have to make it right.
                     ----


                 I think we all agree on this one! Hence the discussion.


                     I fear all suggestions do not go far enough to
                     become something that

                 will

                     carry on for very much longer.
                     I am currently working on KAFKA-3705 and try to
                     find the most easy way

                 for

                     the user to give me all the required
                     functionality. The easiest

                 interface I

                     could come up so far can be looked at here.



https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
                     <
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
                 kafka/streams/kstream/internals/KTableImpl.java#L622


                 And its already horribly complicated. I am currently
                 unable to find the

                     right abstraction level to have everything falling
                     into place

                 naturally. To

                     be honest I already think introducing


                 To be fair that is not a particularly easy problem to
                 solve!



https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
                     <
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
                 kafka/streams/kstream/internals/KTableImpl.java#L493

                     was unideal and makes everything a mess.


                 I'm not sure i agree that it makes everything a mess,
                 but It could have
                 been done differently.

                 The JoinType:Whatever is also not really flexible. 2
                 things come to my
                 mind:

                     1. I don't think we should rule out config based
                     decisions say configs

                 like


                      streams.$applicationID.joins.$joinname.conf = value

                 Is this just for config? Or are you suggesting that we
                 could somehow
                 "code"
                 the join in a config file?


                     This can allow for tremendous changes without
                     single API change and IMO

                 it

                     was not considered enough yet.

                     2. Push logic from the DSL to the Callback
                     classes. A ValueJoiner for
                     example can be used to implement different join
                     types as the user

                 wishes.
                 Do you have an example of how this might look?


                     As Gouzhang said: stopping to break users is very
                     important.


                 Of course. We want to make it as easy as possible for
                 people to use
                 streams.


                 especially with this changes + All the plans I sadly
                 only have in my head

                     but hopefully the first link can give a glimpse.

                     Thanks for preparing the examples made it way
                     clearer to me what exactly
                     we are talking about. I would argue to go a bit
                     slower and more

                 carefull on

                     this one. At some point we need to get it right.
                     Peeking over to the

                 hadoop

                     guys with their hughe userbase. Config files
                     really work well for them.

                     Best Jan





                     On 30.06.2017 09:31, Damian Guy wrote:

                         Thanks Matthias

                         On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
                         <matth...@confluent.io
                         <mailto:matth...@confluent.io>>

                     wrote:

                             I am just catching up on this thread, so
                             sorry for the long email in
                             advance... Also, it's to some extend a
                             dump of thoughts and not

                 always a

                             clear proposal. Still need to think about
                             this in more detail. But

                 maybe

                             it helps other to get new ideas :)


                                     However, I don't understand your
                                     argument about putting aggregate()
                                     after the withXX() -- all the
                                     calls to withXX() set optional

                     parameters

                                     for aggregate() and not for
                                     groupBy() -- but a groupBy().withXX()
                                     indicates that the withXX()
                                     belongs to the groupBy(). IMHO, this

                 might

                                     be quite confusion for developers.


                                 I see what you are saying, but the
                                 grouped stream is effectively a

                     no-op

                                 until you call one of the
                                 aggregate/count/reduce etc functions. So

                 the

                                 optional params are ones that are
                                 applicable to any of the

                 operations

                     you

                                 can perform on this grouped stream.
                                 Then the final
                                 count()/reduce()/aggregate() call has
                                 any of the params that are
                                 required/specific to that function.

                             I understand your argument, but you don't
                             share the conclusion. If we
                             need a "final/terminal" call, the better
                             way might be

                             .groupBy().count().withXX().build()

                             (with a better name for build() though)


                         The point is that all the other calls,
                         i.e,withBlah, windowed, etc

                 apply

                         too all the aggregate functions. The terminal
                         call being the actual

                 type

                     of

                         aggregation you want to do. I personally find
                         this more natural than
                         groupBy().count().withBlah().build()


                                 groupedStream.count(/** non windowed
                                 count**/)

  groupedStream.windowed(TimeWindows.of(10L)).count(...)
  groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
                             I like this. However, I don't see a reason
                             to have windowed() and
                             sessionWindowed(). We should have one
                             top-level `Windows` interface

                 that

                             both `TimeWindows` and `SessionWindows`
                             implement and just have a

                 single

                             windowed() method that accepts all
                             `Windows`. (I did not like the
                             separation of `SessionWindows` in the
                             first place, and this seems to

                 be

                             an opportunity to clean this up. It was
                             hard to change when we
                             introduced session windows)

                         Yes - true we should look into that.


                             Btw: we do you the imperative groupBy()
                             and groupByKey(), and thus we
                             might also want to use windowBy() (instead
                             of windowed()). Not sure

                 how

                             important this is, but it seems to be
                             inconsistent otherwise.


                         Makes sense


                             About joins:  I don't like
                             .withJoinType(JoinType.LEFT) at all. I

                 think,

                             defining an inner/left/outer join is not
                             an optional argument but a
                             first class concept and should have a
                             proper representation in the

                 API

                             (like the current methods join(),
                             leftJoin, outerJoin()).


                         Yep, i did originally have it as a required
                         param and maybe that is

                 what

                     we

                         go with. It could have a default, but maybe
                         that is confusing.



                             About the two join API proposals, the
                             second one has too much boiler
                             plate code for my taste. Also, the actual
                             join() operator has only

                 one

                             argument what is weird to me, as in my
                             thinking process, the main
                             operator call, should have one parameter
                             per mandatory argument but

                 your

                             proposal put the mandatory arguments into
                             Joins.streamStreamJoin()

                 call.

                             This is far from intuitive IMHO.


                         This is the builder pattern, you only need one
                         param as the builder

                 has

                         captured all of the required and optional
                         arguments.


                             The first join proposal also seems to
                             align better with the pattern
                             suggested for aggregations and having the
                             same pattern for all

                 operators

                             is important (as you stated already).


                         This is why i offered two alternatives as i
                         started out with. 1 is the
                         builder pattern, the other is the more fluent
                         pattern.


                             Coming back to the config vs optional
                             parameter. What about having a
                             method withConfig[s](...) that allow to
                             put in the configuration?


                         Sure, it is currently called withLogConfig()
                         as that is the only thing

                     that

                         is really config.


                             This also raises the question if until()
                             is a windows property?
                             Actually, until() seems to be a
                             configuration parameter and thus,

                 should

                             not not have it's own method.


                         Hmmm, i don't agree. Until is a property of
                         the window. It is going

                 to be

                         potentially different for every window
                         operation you do in a streams

                 app.


                             Browsing throw your example DSL branch, I
                             also saw this one:

                                 final KTable<Windowed<String>, Long>
                                 windowed>

                                groupedStream.counting()

                                  .windowed(TimeWindows.of(10L).until(10))
                                                    .table();

                             This is an interesting idea, and it remind
                             my on some feedback about

                 "I

                             wanted to count a stream, but there was no
                             count() method -- I first
                             needed to figure out, that I need to group
                             the stream first to be

                 able

                             to count it. It does make sense in
                             hindsight but was not obvious in

                 the

                             beginning". Thus, carrying out this
                             thought, we could also do the
                             following:


  stream.count().groupedBy().windowedBy().table();
                             -> Note, I use "grouped" and "windowed"
                             instead of imperative here,

                 as

                             it comes after the count()

                             This would be more consistent than your
                             proposal (that has grouping
                             before but windowing after count()). It
                             might even allow us to enrich
                             the API with a some syntactic sugar like
                             `stream.count().table()` to

                 get

                             the overall count of all records (this
                             would obviously not scale,

                 but we

                             could support it -- if not now, maybe later).


                         I guess i'd prefer
                         stream.groupBy().windowBy().count()
                         stream.groupBy().windowBy().reduce()
                         stream.groupBy().count()

                         As i said above, everything that happens
                         before the final aggregate

                 call

                         can be applied to any of them. So it makes
                         sense to me to do those

                 things

                         ahead of the final aggregate call.


                             Last about builder pattern. I am convinced
                             that we need some

                 "terminal"

                             operator/method that tells us when to add
                             the processor to the

                 topology.

                             But I don't see the need for a plain
                             builder pattern that feels

                 alien to

                             me (see my argument about the second join
                             proposal). Using .stream()

                 /

                             .table() as use in many examples might
                             work. But maybe a more generic
                             name that we can use in all places like
                             build() or apply() might

                 also be

                             an option.


                         Sure, a generic name might be ok.




                             -Matthias



                             On 6/29/17 7:37 AM, Damian Guy wrote:

                                 Thanks Kyle.

                                 On Thu, 29 Jun 2017 at 15:11 Kyle
                                 Winkelman <

                 winkelman.k...@gmail.com
                 <mailto:winkelman.k...@gmail.com>>

                                 wrote:

                                     Hi Damian,

                                                     When trying to
                                                     program in the
                                                     fluent API that
                                                     has been

                 discussed

                             most

                                     it

                                                     feels difficult to
                                                     know when you will
                                                     actually get an
object
                 you

                     can

                                     reuse.

                                                     What if I make one
                                                     KGroupedStream
                                                     that I want to
                                                     reuse, is it

                     legal

                             to

                                                     reuse it or does
                                                     this approach
                                                     expect you to call
                                                     grouped each

                     time?

                                             I'd anticipate that once
                                             you have a KGroupedStream
                                             you can

                 re-use it

                             as

                                     you

                                             can today.

                                     You said it yourself in another
                                     post that the grouped stream is
                                     effectively a no-op until a count,
                                     reduce, or aggregate. The way I

                 see

                             it

                                     you wouldn’t be able to reuse
                                     anything except KStreams and KTables,

                             because

                                     most of this fluent api would
                                     continue returning this (this being

                 the

                                     builder object currently being
                                     manipulated).

                                 So, if you ever store a reference to
                                 anything but KStreams and

                 KTables

                             and

                                     you use it in two different ways
                                     then its possible you make

                     conflicting

                                     withXXX() calls on the same builder.


                                 No necessarily true. It could return a
                                 new instance of the builder,

                     i.e.,

                                 the builders being immutable. So if
                                 you held a reference to the

                 builder

                             it

                                 would always be the same as it was
                                 when it was created.


                                     GroupedStream<K,V>
                                     groupedStreamWithDefaultSerdes =

                 kStream.grouped();

                                     GroupedStream<K,V>
                                     groupedStreamWithDeclaredSerdes =

  groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
                                     I’ll admit that this shouldn’t
                                     happen but some user is going to do

                 it

                                     eventually…
                                     Depending on implementation uses
                                     of groupedStreamWithDefaultSerdes

                     would

                                     most likely be equivalent to the
                                     version withDeclaredSerdes. One

                 work

                                     around would be to always make
                                     copies of the config objects you are
                                     building, but this approach has
                                     its own problem because now we

                 have to

                                     identify which configs are
                                     equivalent so we don’t create
repeated
                                     processors.

                                     The point of this long winded
                                     example is that we always have to be
                                     thinking about all of the possible
                                     ways it could be misused by a

                 user

                                     (causing them to see hard to
                                     diagnose problems).

                                 Exactly! That is the point of the
                                 discussion really.


                                     In my attempt at a couple methods
                                     with builders I feel that I could
                                     confidently say the user couldn’t
                                     really mess it up.

                                         // Count
                                         KTable<String, Long> count =


  kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
                                     The kGroupedStream is reusable and
                                     if they attempted to reuse the

                     Count

                                     for some reason it would throw an
                                     error message saying that a store

                             named

                                     “my-store” already exists.


                                 Yes i agree and i think using builders
                                 is my preferred pattern.

                                 Cheers,
                                 Damian


                                     Thanks,
                                     Kyle

                                     From: Damian Guy
                                     Sent: Thursday, June 29, 2017 3:59 AM
                                     To: d...@kafka.apache.org
                                     <mailto:d...@kafka.apache.org>
                                     Subject: Re: [DISCUSS] Streams
                                     DSL/StateStore Refactoring

                                     Hi Kyle,

                                     Thanks for your input. Really
                                     appreciated.

                                     On Thu, 29 Jun 2017 at 06:09 Kyle
                                     Winkelman <

                 winkelman.k...@gmail.com <mailto:
winkelman.k...@gmail.com>
                                     wrote:

                                         I like more of a builder
                                         pattern even though others
                                         have voiced

                     against

                                         it. The reason I like it is
                                         because it makes it clear to
                                         the user

                     that

                             a

                                         call to KGroupedStream#count
                                         will return a KTable not some

                     intermediate

                                         class that I need to undetstand.

                                     Yes, that makes sense.


                                         When trying to program in the
                                         fluent API that has been
discussed
                 most

                             it

                                         feels difficult to know when
                                         you will actually get an
                                         object you

                 can

                                     reuse.

                                         What if I make one
                                         KGroupedStream that I want to
                                         reuse, is it

                 legal

                     to

                                         reuse it or does this approach
                                         expect you to call grouped each

                 time?

                                     I'd anticipate that once you have
                                     a KGroupedStream you can re-use

                 it

                     as

                             you

                                     can today.


                                         This question doesn’t pop into
                                         my head at all in the builder

                 pattern

                     I

                                         assume I can reuse everything.
                                         Finally, I like .groupByKey
                                         and .groupBy(KeyValueMapper)
                                         not a big

                     fan

                             of

                                         the grouped.

                                         Yes, grouped() was more for
                                         demonstration and because
                                         groupBy()

                 and

                                     groupByKey() were taken! So i'd
                                     imagine the api would actually

                 want to

                             be

                                     groupByKey(/** no required
                                     args***/).withOptionalArg() and
                                     groupBy(KeyValueMapper
                                     m).withOpitionalArg(...)  of
                                     course this all

                             depends

                                     on maintaining backward
compatibility.

                                         Unfortunately, the below
                                         approach would require atleast 2

                 (probably

                     3)

                                         overloads (one for returning a
                                         KTable and one for returning a

                 KTable

                             with

                                         Windowed Key, probably would
                                         want to split windowed and

                     sessionwindowed

                                     for

                                         ease of implementation) of
                                         each count, reduce, and
aggregate.
                                         Obviously not exhaustive but
                                         enough for you to get the
picture.
                     Count,

                                         Reduce, and Aggregate supply 3
                                         static methods to initialize the

                             builder:

                                         // Count
                                         KTable<String, Long> count =


  groupedStream.count(Count.count().withQueryableStoreName("my-store"));
                                         // Windowed Count
                                         KTable<Windowed<String>, Long>
                                         windowedCount =


  groupedStream.count(Count.windowed(TimeWindows.of(10L).until
                 (10)).withQueryableStoreName("my-windowed-store"));

                                         // Session Count
                                         KTable<Windowed<String>, Long>
                                         sessionCount =


  groupedStream.count(Count.sessionWindowed(SessionWindows.

  with(10L)).withQueryableStoreName("my-session-windowed-store"));
                                     Above and below, i think i'd
                                     prefer it to be:
                                     groupedStream.count(/** non
                                     windowed count**/)

  groupedStream.windowed(TimeWindows.of(10L)).count(...)
  groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)



                                         // Reduce
                                         Reducer<Long> reducer;
                                         KTable<String, Long> reduce =
                                         groupedStream.reduce(reducer,

  Reduce.reduce().withQueryableStoreName("my-store"));
                                         // Aggregate Windowed with
                                         Custom Store
                                         Initializer<String> initializer;
                                         Aggregator<String, Long,
                                         String> aggregator;
                                         KTable<Windowed<String>,
                                         String> aggregate =

  groupedStream.aggregate(initializer,
                                         aggregator,

                     Aggregate.windowed(TimeWindows.of(10L).until(10)).

                 withStateStoreSupplier(stateStoreSupplier)));

                                         // Cogroup SessionWindowed
                                         KTable<String, String> cogrouped
=
                     groupedStream1.cogroup(aggregator1)


                                         .cogroup(groupedStream2,
                                         aggregator2)



Reply via email to