Just to add to Damian's points, I think the optimizations for KAFKA-5581
can be done as a second step after this API refactoring, since it should
not require any further public API changes and hence to me a bit orthogonal
to this KIP. To illustrate my thinking:

```
someOps.table(); // the library may or may not materialize the resulted
KTable in a new store depending on the
// optimizations we have; but that is abstracted away from the user and
even if
// there is a materialized store, it is not exposed for querying.

someOps.table(Materialized.as("store1")) // the library may or may not
ACTUALLY materialize the resulted KTable
// in a new store depending on the optimizations;
// if it ever decides to materialize it will choose whatever the default
// storage engine, but it will expose either the actual state store or
// just the "logical view" for users to query.

someOps.table(Materialized.as(MyStoreSupplier)) // the library was forced
by user to materialize the resulted KTable
// with the provided store supplier's storage engine; the users can
// always query it with the supplier's name reference.
```

So, the only case that we have to materialize the store is when user did
enforce a store supplier indicating that she wants to physically
materialize it with the provided storage engine; in other cases the library
still have the freedom to choose whether to materialize or not.


Guozhang


On Wed, Jul 19, 2017 at 3:03 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Jan,
>
> Thanks for your input. Comments inline
>
> On Tue, 18 Jul 2017 at 15:21 Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>> Hi,
>>
>>
>> 1. To many overloads:
>> Currently, whenever a KTable is the result of an operation it gets and
>> override with stateStoreName, and StatestoreSupplier in case people want
>> to query that.
>> This is what produces 2/3rd of the overloaded methods right now (not
>> counting methods returning KStream)
>>
>>
> As you state further down we are trying to address this.
>
>
>> 2. Code copy and pasting.
>> Almost all KTableProcessorSuppliers have the same block of (if(name !=
>> null) store.put(k,v))
>>
>>
> Yes, i agree. That is related to the KTable queryable store etc, and can
> easily be addressed, but isn't necessarily part of this as it doesn't need
> to be a public interface change, i.e., we can clean that up in the
> background.
>
>
>> 3. Runtime inefficiencies.
>> Each querable table almost instantly causes a another store beeing
>> required. Storing equivalent data of upstream KTables.
>>
>
> Agreed. Again, this is not a public interface change. We don't need
> another store, i.e., we can just use a "View" on the existing store, which
> is basically just using the KTableValueGetter to apply the map or filter
> operation to the original store. We also have this jira
> https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
> when we do and don't need to add additional changelogs.
>
>
>>
>> So I really see us tackeling only the first part currently. Wich in my
>> opinion is to short-sighted to settle on an Public API.
>>
>
> We are not settling on the public API. We do, however need to do KIPs for
> public API discussions. For internal changes we don't necessarily need to
> have a public discussion about it.
>
>
>> This is why I want to tackle our approach to IQ-first, as it seems to me
>> to be the most disruptive thing. And the cause of most problems.
>>
>> The Plan:
>>
>> Table from topic, kstream (don't even like this one, but probaly needed
>> for some kind of enhanced flexibility) or aggregations would be the only
>> KTables that would get associated with a statestore (their processors).
>> For these operations one can have the "statestoresupplier" overload but
>> also not the "querablestatestore" overload. From this point on KTables
>> abstraction would be considered restored.
>> All the overloads of join and through with respect to IQ would go away.
>> "through" would go completely maybe the benefit added is. The method I
>> would add is for a table to get a Queryhandle.
>> This query handle will underneath remember its tables processor name. To
>> access the data form IQ we would not rely on the "per processor
>> statestore" but go the usual path through ValueGetterSupplier.
>> *Note:* We do not necessarily have a Serde for V, especially after
>>
>> mapValues. also not for any intermediate Data types. It would be each
>> KTableProccesors job to provide a serialized version of upstream
>> Datatypes.
>> KTableKTabkeJoinwould need to bring a JoinInputSerializer<V1,V2> that
>> would serialize both upstream values for transport across boxes.
>>
>> This first step would kill all the "Storename" based overloads + many
>> Statestore overloads. It would also avoid the bloated copy pasting in
>> each KTableProcessor for maintaining the store.
>> It would also make the runtime more efficient in a way that it does not
>> store the same data twice, just for accessing from IQ. Tackeling problem
>> 1 but also all other three problems mentioned above.
>>
>>  From here ~3 or 4 (from kstream,topic or aggregate) methods would still
>> be stuck with StateStoresupplier overload. For me, this is quite an
>> improvement already, to reduce further overloads I am thinking
>> to put a nullable properties to this operations. If people want to use
>> all defaults they could throw in null and it wouldn't be to painfull.
>> That doesn't necessarily require
>> them to have config files laying around. They could if they wanted use
>> property files to create such properties + we would over to look for
>> configs in the streams property.
>> So the complexity of distributing property files is optional and the
>> user might choose to fill the configs by code or files.
>>
>> I think these steps can rescue the proper abstraction of a KTable. I
>> believe that with the current proposals we are only sugarcoating problem
>> 1 and end up with a broken idea of what KTable is.
>> I think it will be even harder to develop further from there. Interface
>> wise my proposal is like developing backwards as i am very certain we
>> did a wrong turn with the IQ we shouldn't try to carry through.
>>
>> I hope I could explain how this re factoring can tackle  the 3 above
>> problems and especially why i don't think we can win tackiling only
>> point 1 in the long run.
>> If anything would need an implementation draft please feel free to ask
>> me to provide one. Initially the proposal hopefully would get the job
>> done of just removing clutter.
>>
>>
>
> I agree with some of what you have said in the above few paragraphs. I
> think you are correct in that KTable has become littered with a bunch of
> methods to make each stage queryable, i.e, adding the overloads
> for queryableStoreName and StateStoreSupplier. I think we can do away with
> both of them as once you have a KTable you can always build a view of it by
> using the KTableValueGetter. So we don't ever need a StateStoreSupplier as
> we have one already from when the original KTable was created. We can also
> possibly remove the overloads with queryableName and always use a generated
> name that can be retrieved from the method `String queryableStoreName` -
> this can then be used with IQ if needed.
>
> The getQueryHandle idea you mention won't really work as things stand. The
> KTable knows nothing about it's runtime context it is purely for building a
> topology that can be executed. In order to successfully query a `KTable`
> (state store) we need to know how many partitions and on which threads the
> state stores are running. This is why we added the `stores` API to
> `KafkaStreams` as this is the execution environment that has all of the
> information.
>
>
> Thanks,
> Damian
>
> Looking forward to your comments.
>>
>> Best Jan
>>
>>
>>
>> On 12.07.2017 21:27, Guozhang Wang wrote:
>> > Hello Jan,
>> >
>> > Thanks for your feedbacks. Let me try to clarify a few things with the
>> > problems that we are trying to resolve and the motivations with the
>> > current proposals.
>> >
>> > As Matthias mentioned, one issue that we are trying to tackle is to
>> > reduce the number of overloaded functions in the DSL due to serde
>> > overridden / state store supplier overridden that are needed for
>> > repartition, or for state store materializations. Another related
>> > issue is that the current overridden state store supplier is not very
>> > natural to use, for example:
>> >
>> > 1) If a user just want to disable caching / logging etc but do not
>> > want to change the underlying store engine at all, she needs to learn
>> > to know that, for example, if a windowed store or key-value store is
>> > needed for this specific operator in the DSL, what serdes are needed
>> > for materialize the store, in order to create a StateStoreSupplier
>> > with caching / logging disabled, and then pass into the DSL.
>> >
>> > 2) Similarly, if a user just want to set different topic configs for
>> > the changelog topic, she still need to specify the whole
>> > StateStoreSupplier into the operator.
>> >
>> > 3) If a user want to use a different store engine (e.g. MyStore than
>> > RocksDBStore) underneath but do not care about the default settings
>> > for logging, caching, etc, he STILL needs to pass in the whole
>> > StateStoreSupplier into the operator.
>> >
>> > Note that all the above scenarios are for advanced users who do want
>> > to override these settings, for users who are just OK with the default
>> > settings they should be not exposed with such APIs at all, like you
>> > said, "I do not be exposed with any of such implementation details",
>> > if you do not care.
>> >
>> > -----------------
>> >
>> > We have been talking about the configs v.s. code for such settings,
>> > since we have been using configs for "global" default configs; but the
>> > arguments against using configs for such per-operator / per-store
>> > settings as well is that it will simply make configs hard to manage /
>> > hard to wire with tools. Personally speaking, I'm not a big fan of
>> > using configs for per-entity overrides and that is mainly from my
>> > experience with Samza:Samza inherits exactly the same approach for
>> > per-stream / per-source configs:
>> >
>> > http://samza.apache.org/learn/documentation/0.13/jobs/
>> configuration-table.html
>> > <http://samza.apache.org/learn/documentation/0.13/jobs/
>> configuration-table.html> ([system-name][stream-id]
>> > etc are all place-holders)
>> >
>> > The main issues were 1) users making config changes need to deploy
>> > this to all the instances, I think for Streams it would be even worse
>> > as we need to make a config file on each of the running instance, and
>> > whenever there is a change we need to make sure they are propagated to
>> > all of them, 2) whenever users make some code changes, e.g. to add a
>> > new stream / system, they need to remember to set the corresponding
>> > changes in the config files as well and they kept forgetting about it,
>> > the lesson learned there was that it is always better to change one
>> > place (code change) than two (code change + config file change).
>> >
>> > Again, this is not saying we have vetoed this option, and if people
>> > have good reasons for this let's discuss them here.
>> >
>> > -----------------
>> >
>> > So the current proposals are mainly around keeping configs for the
>> > global default settings, while still allowing users to override
>> > per-operator / per-store settings in the code, while also keeping in
>> > mind to not forced users to think about such implementation details if
>> > they are fine with whatever the default settings. For example:
>> >
>> > As a normal user it is sufficient to specify an aggregation as
>> >
>> > ```
>> > table4.join(table5, joiner).table();
>> > ```
>> >
>> > in which she can still just focus on the computational logic with all
>> > implementation details abstracted away; only if the user are capable
>> > enough with the implementation details (e.g. how is the joining tables
>> > be materialized into state stores, etc) and want to specify her own
>> > settings (e.g. I want to swap in my own state store engine, or I want
>> > to disable caching for dedup, or use a different serde etc) she can
>> > "explore" them with the DSL again:
>> >
>> > ```
>> > table4.join(table5, joiner).table(Materialized.as("store1")); // use a
>> > custom store name for interactive query
>> > table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier));
>> > // use a custom store engine
>> > table4.join(table5,
>> > joiner).table(Materialized.as("store1").withLoggingEnabled(configs));
>> > // use a custom store changelog topic configs
>> > // ... more
>> > ```
>> >
>> > Hope it helps.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com
>> > <mailto:jan.filip...@trivago.com>> wrote:
>> >
>> >     It makes me want to cry.
>> >
>> >     why on earth is the DSL going to expose all its implementation
>> >     details now?
>> >     especially being materialized or not.
>> >
>> >     If we want to take usefull steps in that direction maybe we are
>> >     looking for a way to let the user switch back and forth between
>> >     PAPI and DSL?
>> >
>> >     A change as the proposed would not eliminate any of my pain points
>> >     while still being a heck of work migrating towards to.
>> >
>> >     Since I am only following this from the point where Eno CC'ed it
>> >     into the users list:
>> >
>> >     Can someone please rephrase for me what problem this is trying to
>> >     solve? I don't mean to be rude but It uses a problematic feature
>> >     "StateStoreSuppliers in DSL" to justify making it even worse. This
>> >     helps us nowhere in making the configs more flexible, its just
>> >     syntactic sugar.
>> >
>> >     A low effort shoot like: lets add a properties to operations that
>> >     would otherwise become overloaded to heavy? Or pull the configs by
>> >     some naming schema
>> >     form the overall properties. Additionally to that we get rid of
>> >     StateStoreSuppliers in the DSL and have them also configured by
>> >     said properties.
>> >
>> >     => way easier to migrate to, way less risk, way more flexible in
>> >     the future (different implementations of the same operation don't
>> >     require code change to configure)
>> >
>> >     Line 184 makes especially no sense to me. what is a KTableKTable
>> >     non materialized join anyways?
>> >
>> >     Hope we can discuss more on this.
>> >
>> >
>> >
>> >
>> >     On 07.07.2017 17:23, Guozhang Wang wrote:
>> >
>> >         I messed the indentation on github code repos; this would be
>> >         easier to read:
>> >
>> >         https://codeshare.io/GLWW8K
>> >
>> >
>> >         Guozhang
>> >
>> >
>> >         On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang
>> >         <wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote:
>> >
>> >             Hi Damian / Kyle,
>> >
>> >             I think I agree with you guys about the pros / cons of
>> >             using the builder
>> >             pattern v.s. using some "secondary classes". And I'm
>> >             thinking if we can
>> >             take a "mid" manner between these two. I spent some time
>> >             with a slight
>> >             different approach from Damian's current proposal:
>> >
>> >             https://github.com/guozhangwang/kafka/blob/dsl-
>> refactor/streams/src/main/
>> >             <https://github.com/guozhangwang/kafka/blob/dsl-
>> refactor/streams/src/main/>
>> >             java/org/apache/kafka/streams/RefactoredAPIs.java
>> >
>> >             The key idea is to tolerate the final "table()" or
>> >             "stream()" function to
>> >             "upgrade" from the secondary classes to the first citizen
>> >             classes, while
>> >             having all the specs inside this function. Also this
>> >             proposal includes some
>> >             other refactoring that people have been discussed about
>> >             for the builder to
>> >             reduce the overloaded functions as well. WDYT?
>> >
>> >
>> >             Guozhang
>> >
>> >
>> >             On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy
>> >             <damian....@gmail.com <mailto:damian....@gmail.com>> wrote:
>> >
>> >                 Hi Jan,
>> >
>> >                 Thanks very much for the input.
>> >
>> >                 On Tue, 4 Jul 2017 at 08:54 Jan Filipiak
>> >                 <jan.filip...@trivago.com
>> >                 <mailto:jan.filip...@trivago.com>>
>>
>> >                 wrote:
>> >
>> >                     Hi Damian,
>> >
>> >                     I do see your point of something needs to change.
>> >                     But I fully agree with
>> >                     Gouzhang when he says.
>> >                     ---
>> >
>> >                     But since this is a incompatibility change, and we
>> >                     are going to remove
>> >
>> >                 the
>> >
>> >                     compatibility annotations soon it means we only
>> >                     have one chance and we
>> >                     really have to make it right.
>> >                     ----
>> >
>> >
>> >                 I think we all agree on this one! Hence the discussion.
>> >
>> >
>> >                     I fear all suggestions do not go far enough to
>> >                     become something that
>> >
>> >                 will
>> >
>> >                     carry on for very much longer.
>> >                     I am currently working on KAFKA-3705 and try to
>> >                     find the most easy way
>> >
>> >                 for
>> >
>> >                     the user to give me all the required
>> >                     functionality. The easiest
>> >
>> >                 interface I
>> >
>> >                     could come up so far can be looked at here.
>> >
>> >
>> >                     https://github.com/Kaiserchen/kafka/blob/
>> 3da2b8f787a5d30dee2
>> >                     <https://github.com/Kaiserchen/kafka/blob/
>> 3da2b8f787a5d30dee2>
>> >
>> >                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>> >                 kafka/streams/kstream/internals/KTableImpl.java#L622
>> >
>> >
>> >                 And its already horribly complicated. I am currently
>> >                 unable to find the
>> >
>> >                     right abstraction level to have everything falling
>> >                     into place
>> >
>> >                 naturally. To
>> >
>> >                     be honest I already think introducing
>> >
>> >
>> >                 To be fair that is not a particularly easy problem to
>> >                 solve!
>> >
>> >
>> >                     https://github.com/Kaiserchen/kafka/blob/
>> 3da2b8f787a5d30dee2
>> >                     <https://github.com/Kaiserchen/kafka/blob/
>> 3da2b8f787a5d30dee2>
>> >
>> >                 de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>> >                 kafka/streams/kstream/internals/KTableImpl.java#L493
>> >
>> >                     was unideal and makes everything a mess.
>> >
>> >
>> >                 I'm not sure i agree that it makes everything a mess,
>> >                 but It could have
>> >                 been done differently.
>> >
>> >                 The JoinType:Whatever is also not really flexible. 2
>> >                 things come to my
>> >                 mind:
>> >
>> >                     1. I don't think we should rule out config based
>> >                     decisions say configs
>> >
>> >                 like
>> >
>> >
>> >                      streams.$applicationID.joins.$joinname.conf =
>> value
>> >
>> >                 Is this just for config? Or are you suggesting that we
>> >                 could somehow
>> >                 "code"
>> >                 the join in a config file?
>> >
>> >
>> >                     This can allow for tremendous changes without
>> >                     single API change and IMO
>> >
>> >                 it
>> >
>> >                     was not considered enough yet.
>> >
>> >                     2. Push logic from the DSL to the Callback
>> >                     classes. A ValueJoiner for
>> >                     example can be used to implement different join
>> >                     types as the user
>> >
>> >                 wishes.
>> >                 Do you have an example of how this might look?
>> >
>> >
>> >                     As Gouzhang said: stopping to break users is very
>> >                     important.
>> >
>> >
>> >                 Of course. We want to make it as easy as possible for
>> >                 people to use
>> >                 streams.
>> >
>> >
>> >                 especially with this changes + All the plans I sadly
>> >                 only have in my head
>> >
>> >                     but hopefully the first link can give a glimpse.
>> >
>> >                     Thanks for preparing the examples made it way
>> >                     clearer to me what exactly
>> >                     we are talking about. I would argue to go a bit
>> >                     slower and more
>> >
>> >                 carefull on
>> >
>> >                     this one. At some point we need to get it right.
>> >                     Peeking over to the
>> >
>> >                 hadoop
>> >
>> >                     guys with their hughe userbase. Config files
>> >                     really work well for them.
>> >
>> >                     Best Jan
>> >
>> >
>> >
>> >
>> >
>> >                     On 30.06.2017 09:31, Damian Guy wrote:
>> >
>> >                         Thanks Matthias
>> >
>> >                         On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
>> >                         <matth...@confluent.io
>> >                         <mailto:matth...@confluent.io>>
>>
>> >
>> >                     wrote:
>> >
>> >                             I am just catching up on this thread, so
>> >                             sorry for the long email in
>> >                             advance... Also, it's to some extend a
>> >                             dump of thoughts and not
>> >
>> >                 always a
>> >
>> >                             clear proposal. Still need to think about
>> >                             this in more detail. But
>> >
>> >                 maybe
>> >
>> >                             it helps other to get new ideas :)
>> >
>> >
>> >                                     However, I don't understand your
>> >                                     argument about putting aggregate()
>> >                                     after the withXX() -- all the
>> >                                     calls to withXX() set optional
>> >
>> >                     parameters
>> >
>> >                                     for aggregate() and not for
>> >                                     groupBy() -- but a
>> groupBy().withXX()
>> >                                     indicates that the withXX()
>> >                                     belongs to the groupBy(). IMHO, this
>> >
>> >                 might
>> >
>> >                                     be quite confusion for developers.
>> >
>> >
>> >                                 I see what you are saying, but the
>> >                                 grouped stream is effectively a
>> >
>> >                     no-op
>> >
>> >                                 until you call one of the
>> >                                 aggregate/count/reduce etc functions. So
>> >
>> >                 the
>> >
>> >                                 optional params are ones that are
>> >                                 applicable to any of the
>> >
>> >                 operations
>> >
>> >                     you
>> >
>> >                                 can perform on this grouped stream.
>> >                                 Then the final
>> >                                 count()/reduce()/aggregate() call has
>> >                                 any of the params that are
>> >                                 required/specific to that function.
>> >
>> >                             I understand your argument, but you don't
>> >                             share the conclusion. If we
>> >                             need a "final/terminal" call, the better
>> >                             way might be
>> >
>> >                             .groupBy().count().withXX().build()
>> >
>> >                             (with a better name for build() though)
>> >
>> >
>> >                         The point is that all the other calls,
>> >                         i.e,withBlah, windowed, etc
>> >
>> >                 apply
>> >
>> >                         too all the aggregate functions. The terminal
>> >                         call being the actual
>> >
>> >                 type
>> >
>> >                     of
>> >
>> >                         aggregation you want to do. I personally find
>> >                         this more natural than
>> >                         groupBy().count().withBlah().build()
>> >
>> >
>> >                                 groupedStream.count(/** non windowed
>> >                                 count**/)
>> >                                 groupedStream.windowed(
>> TimeWindows.of(10L)).count(...)
>> >                                 groupedStream.sessionWindowed(
>> SessionWindows.of(10L)).count(...)
>> >
>> >                             I like this. However, I don't see a reason
>> >                             to have windowed() and
>> >                             sessionWindowed(). We should have one
>> >                             top-level `Windows` interface
>> >
>> >                 that
>> >
>> >                             both `TimeWindows` and `SessionWindows`
>> >                             implement and just have a
>> >
>> >                 single
>> >
>> >                             windowed() method that accepts all
>> >                             `Windows`. (I did not like the
>> >                             separation of `SessionWindows` in the
>> >                             first place, and this seems to
>> >
>> >                 be
>> >
>> >                             an opportunity to clean this up. It was
>> >                             hard to change when we
>> >                             introduced session windows)
>> >
>> >                         Yes - true we should look into that.
>> >
>> >
>> >                             Btw: we do you the imperative groupBy()
>> >                             and groupByKey(), and thus we
>> >                             might also want to use windowBy() (instead
>> >                             of windowed()). Not sure
>> >
>> >                 how
>> >
>> >                             important this is, but it seems to be
>> >                             inconsistent otherwise.
>> >
>> >
>> >                         Makes sense
>> >
>> >
>> >                             About joins:  I don't like
>> >                             .withJoinType(JoinType.LEFT) at all. I
>> >
>> >                 think,
>> >
>> >                             defining an inner/left/outer join is not
>> >                             an optional argument but a
>> >                             first class concept and should have a
>> >                             proper representation in the
>> >
>> >                 API
>> >
>> >                             (like the current methods join(),
>> >                             leftJoin, outerJoin()).
>> >
>> >
>> >                         Yep, i did originally have it as a required
>> >                         param and maybe that is
>> >
>> >                 what
>> >
>> >                     we
>> >
>> >                         go with. It could have a default, but maybe
>> >                         that is confusing.
>> >
>> >
>> >
>> >                             About the two join API proposals, the
>> >                             second one has too much boiler
>> >                             plate code for my taste. Also, the actual
>> >                             join() operator has only
>> >
>> >                 one
>> >
>> >                             argument what is weird to me, as in my
>> >                             thinking process, the main
>> >                             operator call, should have one parameter
>> >                             per mandatory argument but
>> >
>> >                 your
>> >
>> >                             proposal put the mandatory arguments into
>> >                             Joins.streamStreamJoin()
>> >
>> >                 call.
>> >
>> >                             This is far from intuitive IMHO.
>> >
>> >
>> >                         This is the builder pattern, you only need one
>> >                         param as the builder
>> >
>> >                 has
>> >
>> >                         captured all of the required and optional
>> >                         arguments.
>> >
>> >
>> >                             The first join proposal also seems to
>> >                             align better with the pattern
>> >                             suggested for aggregations and having the
>> >                             same pattern for all
>> >
>> >                 operators
>> >
>> >                             is important (as you stated already).
>> >
>> >
>> >                         This is why i offered two alternatives as i
>> >                         started out with. 1 is the
>> >                         builder pattern, the other is the more fluent
>> >                         pattern.
>> >
>> >
>> >                             Coming back to the config vs optional
>> >                             parameter. What about having a
>> >                             method withConfig[s](...) that allow to
>> >                             put in the configuration?
>> >
>> >
>> >                         Sure, it is currently called withLogConfig()
>> >                         as that is the only thing
>> >
>> >                     that
>> >
>> >                         is really config.
>> >
>> >
>> >                             This also raises the question if until()
>> >                             is a windows property?
>> >                             Actually, until() seems to be a
>> >                             configuration parameter and thus,
>> >
>> >                 should
>> >
>> >                             not not have it's own method.
>> >
>> >
>> >                         Hmmm, i don't agree. Until is a property of
>> >                         the window. It is going
>> >
>> >                 to be
>> >
>> >                         potentially different for every window
>> >                         operation you do in a streams
>> >
>> >                 app.
>> >
>> >
>> >                             Browsing throw your example DSL branch, I
>> >                             also saw this one:
>> >
>> >                                 final KTable<Windowed<String>, Long>
>> >                                 windowed>
>> >
>> >                                groupedStream.counting()
>> >
>> >                                  .windowed(TimeWindows.of(10L).
>> until(10))
>> >                                                    .table();
>> >
>> >                             This is an interesting idea, and it remind
>> >                             my on some feedback about
>> >
>> >                 "I
>> >
>> >                             wanted to count a stream, but there was no
>> >                             count() method -- I first
>> >                             needed to figure out, that I need to group
>> >                             the stream first to be
>> >
>> >                 able
>> >
>> >                             to count it. It does make sense in
>> >                             hindsight but was not obvious in
>> >
>> >                 the
>> >
>> >                             beginning". Thus, carrying out this
>> >                             thought, we could also do the
>> >                             following:
>> >
>> >                             stream.count().groupedBy().
>> windowedBy().table();
>> >
>> >                             -> Note, I use "grouped" and "windowed"
>> >                             instead of imperative here,
>> >
>> >                 as
>> >
>> >                             it comes after the count()
>> >
>> >                             This would be more consistent than your
>> >                             proposal (that has grouping
>> >                             before but windowing after count()). It
>> >                             might even allow us to enrich
>> >                             the API with a some syntactic sugar like
>> >                             `stream.count().table()` to
>> >
>> >                 get
>> >
>> >                             the overall count of all records (this
>> >                             would obviously not scale,
>> >
>> >                 but we
>> >
>> >                             could support it -- if not now, maybe
>> later).
>> >
>> >
>> >                         I guess i'd prefer
>> >                         stream.groupBy().windowBy().count()
>> >                         stream.groupBy().windowBy().reduce()
>> >                         stream.groupBy().count()
>> >
>> >                         As i said above, everything that happens
>> >                         before the final aggregate
>> >
>> >                 call
>> >
>> >                         can be applied to any of them. So it makes
>> >                         sense to me to do those
>> >
>> >                 things
>> >
>> >                         ahead of the final aggregate call.
>> >
>> >
>> >                             Last about builder pattern. I am convinced
>> >                             that we need some
>> >
>> >                 "terminal"
>> >
>> >                             operator/method that tells us when to add
>> >                             the processor to the
>> >
>> >                 topology.
>> >
>> >                             But I don't see the need for a plain
>> >                             builder pattern that feels
>> >
>> >                 alien to
>> >
>> >                             me (see my argument about the second join
>> >                             proposal). Using .stream()
>> >
>> >                 /
>> >
>> >                             .table() as use in many examples might
>> >                             work. But maybe a more generic
>> >                             name that we can use in all places like
>> >                             build() or apply() might
>> >
>> >                 also be
>> >
>> >                             an option.
>> >
>> >
>> >                         Sure, a generic name might be ok.
>> >
>> >
>> >
>> >
>> >                             -Matthias
>> >
>> >
>> >
>> >                             On 6/29/17 7:37 AM, Damian Guy wrote:
>> >
>> >                                 Thanks Kyle.
>> >
>> >                                 On Thu, 29 Jun 2017 at 15:11 Kyle
>> >                                 Winkelman <
>> >
>> >                 winkelman.k...@gmail.com
>> >                 <mailto:winkelman.k...@gmail.com>>
>>
>> >
>> >                                 wrote:
>> >
>> >                                     Hi Damian,
>> >
>> >                                                     When trying to
>> >                                                     program in the
>> >                                                     fluent API that
>> >                                                     has been
>> >
>> >                 discussed
>> >
>> >                             most
>> >
>> >                                     it
>> >
>> >                                                     feels difficult to
>> >                                                     know when you will
>> >                                                     actually get an
>> object
>> >
>> >                 you
>> >
>> >                     can
>> >
>> >                                     reuse.
>> >
>> >                                                     What if I make one
>> >                                                     KGroupedStream
>> >                                                     that I want to
>> >                                                     reuse, is it
>> >
>> >                     legal
>> >
>> >                             to
>> >
>> >                                                     reuse it or does
>> >                                                     this approach
>> >                                                     expect you to call
>> >                                                     grouped each
>> >
>> >                     time?
>> >
>> >                                             I'd anticipate that once
>> >                                             you have a KGroupedStream
>> >                                             you can
>> >
>> >                 re-use it
>> >
>> >                             as
>> >
>> >                                     you
>> >
>> >                                             can today.
>> >
>> >                                     You said it yourself in another
>> >                                     post that the grouped stream is
>> >                                     effectively a no-op until a count,
>> >                                     reduce, or aggregate. The way I
>> >
>> >                 see
>> >
>> >                             it
>> >
>> >                                     you wouldn’t be able to reuse
>> >                                     anything except KStreams and
>> KTables,
>> >
>> >                             because
>> >
>> >                                     most of this fluent api would
>> >                                     continue returning this (this being
>> >
>> >                 the
>> >
>> >                                     builder object currently being
>> >                                     manipulated).
>> >
>> >                                 So, if you ever store a reference to
>> >                                 anything but KStreams and
>> >
>> >                 KTables
>> >
>> >                             and
>> >
>> >                                     you use it in two different ways
>> >                                     then its possible you make
>> >
>> >                     conflicting
>> >
>> >                                     withXXX() calls on the same builder.
>> >
>> >
>> >                                 No necessarily true. It could return a
>> >                                 new instance of the builder,
>> >
>> >                     i.e.,
>> >
>> >                                 the builders being immutable. So if
>> >                                 you held a reference to the
>> >
>> >                 builder
>> >
>> >                             it
>> >
>> >                                 would always be the same as it was
>> >                                 when it was created.
>> >
>> >
>> >                                     GroupedStream<K,V>
>> >                                     groupedStreamWithDefaultSerdes =
>> >
>> >                 kStream.grouped();
>> >
>> >                                     GroupedStream<K,V>
>> >                                     groupedStreamWithDeclaredSerdes =
>> >                                     groupedStreamsWithDefaultSerde
>> s.withKeySerde(…).withValueSerde(…);
>> >
>> >                                     I’ll admit that this shouldn’t
>> >                                     happen but some user is going to do
>> >
>> >                 it
>> >
>> >                                     eventually…
>> >                                     Depending on implementation uses
>> >                                     of groupedStreamWithDefaultSerdes
>> >
>> >                     would
>> >
>> >                                     most likely be equivalent to the
>> >                                     version withDeclaredSerdes. One
>> >
>> >                 work
>> >
>> >                                     around would be to always make
>> >                                     copies of the config objects you are
>> >                                     building, but this approach has
>> >                                     its own problem because now we
>> >
>> >                 have to
>> >
>> >                                     identify which configs are
>> >                                     equivalent so we don’t create
>> repeated
>> >                                     processors.
>> >
>> >                                     The point of this long winded
>> >                                     example is that we always have to be
>> >                                     thinking about all of the possible
>> >                                     ways it could be misused by a
>> >
>> >                 user
>> >
>> >                                     (causing them to see hard to
>> >                                     diagnose problems).
>> >
>> >                                 Exactly! That is the point of the
>> >                                 discussion really.
>> >
>> >
>> >                                     In my attempt at a couple methods
>> >                                     with builders I feel that I could
>> >                                     confidently say the user couldn’t
>> >                                     really mess it up.
>> >
>> >                                         // Count
>> >                                         KTable<String, Long> count =
>> >
>> >                     kGroupedStream.count(Count.count().
>> withQueryableStoreName("my-store"));
>> >
>> >                                     The kGroupedStream is reusable and
>> >                                     if they attempted to reuse the
>> >
>> >                     Count
>> >
>> >                                     for some reason it would throw an
>> >                                     error message saying that a store
>> >
>> >                             named
>> >
>> >                                     “my-store” already exists.
>> >
>> >
>> >                                 Yes i agree and i think using builders
>> >                                 is my preferred pattern.
>> >
>> >                                 Cheers,
>> >                                 Damian
>> >
>> >
>> >                                     Thanks,
>> >                                     Kyle
>> >
>> >                                     From: Damian Guy
>> >                                     Sent: Thursday, June 29, 2017 3:59
>> AM
>> >                                     To: d...@kafka.apache.org
>> >                                     <mailto:d...@kafka.apache.org>
>> >                                     Subject: Re: [DISCUSS] Streams
>> >                                     DSL/StateStore Refactoring
>> >
>> >                                     Hi Kyle,
>> >
>> >                                     Thanks for your input. Really
>> >                                     appreciated.
>> >
>> >                                     On Thu, 29 Jun 2017 at 06:09 Kyle
>> >                                     Winkelman <
>> >
>> >                 winkelman.k...@gmail.com <mailto:winkelman.kyle@gmail.
>> com>
>> >
>> >                                     wrote:
>> >
>> >                                         I like more of a builder
>> >                                         pattern even though others
>> >                                         have voiced
>> >
>> >                     against
>> >
>> >                                         it. The reason I like it is
>> >                                         because it makes it clear to
>> >                                         the user
>> >
>> >                     that
>> >
>> >                             a
>> >
>> >                                         call to KGroupedStream#count
>> >                                         will return a KTable not some
>> >
>> >                     intermediate
>> >
>> >                                         class that I need to undetstand.
>> >
>> >                                     Yes, that makes sense.
>> >
>> >
>> >                                         When trying to program in the
>> >                                         fluent API that has been
>> discussed
>> >
>> >                 most
>> >
>> >                             it
>> >
>> >                                         feels difficult to know when
>> >                                         you will actually get an
>> >                                         object you
>> >
>> >                 can
>> >
>> >                                     reuse.
>> >
>> >                                         What if I make one
>> >                                         KGroupedStream that I want to
>> >                                         reuse, is it
>> >
>> >                 legal
>> >
>> >                     to
>> >
>> >                                         reuse it or does this approach
>> >                                         expect you to call grouped each
>> >
>> >                 time?
>> >
>> >                                     I'd anticipate that once you have
>> >                                     a KGroupedStream you can re-use
>> >
>> >                 it
>> >
>> >                     as
>> >
>> >                             you
>> >
>> >                                     can today.
>> >
>> >
>> >                                         This question doesn’t pop into
>> >                                         my head at all in the builder
>> >
>> >                 pattern
>> >
>> >                     I
>> >
>> >                                         assume I can reuse everything.
>> >                                         Finally, I like .groupByKey
>> >                                         and .groupBy(KeyValueMapper)
>> >                                         not a big
>> >
>> >                     fan
>> >
>> >                             of
>> >
>> >                                         the grouped.
>> >
>> >                                         Yes, grouped() was more for
>> >                                         demonstration and because
>> >                                         groupBy()
>> >
>> >                 and
>> >
>> >                                     groupByKey() were taken! So i'd
>> >                                     imagine the api would actually
>> >
>> >                 want to
>> >
>> >                             be
>> >
>> >                                     groupByKey(/** no required
>> >                                     args***/).withOptionalArg() and
>> >                                     groupBy(KeyValueMapper
>> >                                     m).withOpitionalArg(...)  of
>> >                                     course this all
>> >
>> >                             depends
>> >
>> >                                     on maintaining backward
>> compatibility.
>> >
>> >
>> >                                         Unfortunately, the below
>> >                                         approach would require atleast 2
>> >
>> >                 (probably
>> >
>> >                     3)
>> >
>> >                                         overloads (one for returning a
>> >                                         KTable and one for returning a
>> >
>> >                 KTable
>> >
>> >                             with
>> >
>> >                                         Windowed Key, probably would
>> >                                         want to split windowed and
>> >
>> >                     sessionwindowed
>> >
>> >                                     for
>> >
>> >                                         ease of implementation) of
>> >                                         each count, reduce, and
>> aggregate.
>> >                                         Obviously not exhaustive but
>> >                                         enough for you to get the
>> picture.
>> >
>> >                     Count,
>> >
>> >                                         Reduce, and Aggregate supply 3
>> >                                         static methods to initialize the
>> >
>> >                             builder:
>> >
>> >                                         // Count
>> >                                         KTable<String, Long> count =
>> >
>> >                     groupedStream.count(Count.count().
>> withQueryableStoreName("my-store"));
>> >
>> >                                         // Windowed Count
>> >                                         KTable<Windowed<String>, Long>
>> >                                         windowedCount =
>> >
>> >                     groupedStream.count(Count.
>> windowed(TimeWindows.of(10L).until
>> >
>> >                 (10)).withQueryableStoreName("my-windowed-store"));
>> >
>> >                                         // Session Count
>> >                                         KTable<Windowed<String>, Long>
>> >                                         sessionCount =
>> >
>> >                     groupedStream.count(Count.sessionWindowed(
>> SessionWindows.
>> >
>> >                 with(10L)).withQueryableStoreName("my-
>> session-windowed-store"));
>> >
>> >                                     Above and below, i think i'd
>> >                                     prefer it to be:
>> >                                     groupedStream.count(/** non
>> >                                     windowed count**/)
>> >                                     groupedStream.windowed(
>> TimeWindows.of(10L)).count(...)
>> >                                     groupedStream.sessionWindowed(
>> SessionWindows.of(10L)).count(...)
>> >
>> >
>> >
>> >
>> >                                         // Reduce
>> >                                         Reducer<Long> reducer;
>> >                                         KTable<String, Long> reduce =
>> >                                         groupedStream.reduce(reducer,
>> >                                         Reduce.reduce().
>> withQueryableStoreName("my-store"));
>> >
>> >                                         // Aggregate Windowed with
>> >                                         Custom Store
>> >                                         Initializer<String> initializer;
>> >                                         Aggregator<String, Long,
>> >                                         String> aggregator;
>> >                                         KTable<Windowed<String>,
>> >                                         String> aggregate =
>> >                                         groupedStream.aggregate(
>> initializer,
>> >                                         aggregator,
>> >
>> >                     Aggregate.windowed(TimeWindows.of(10L).until(10)).
>> >
>> >                 withStateStoreSupplier(stateStoreSupplier)));
>> >
>> >                                         // Cogroup SessionWindowed
>> >                                         KTable<String, String>
>> cogrouped =
>> >
>> >                     groupedStream1.cogroup(aggregator1)
>> >
>> >
>> >                                         .cogroup(groupedStream2,
>> >                                         aggregator2)
>> >
>> >
>
>


-- 
-- Guozhang

Reply via email to