Hi Damian, I feel that if we go with the builder method as described below, we are moving away from a declarative DSL.
Eno > On 30 Jan 2017, at 09:38, Damian Guy <damian....@gmail.com> wrote: > > How about something a bit different? We could pass builders to all the > KTable methods. So we have something like this: > ktable.filter(new FilteredTableBuilder(predicate) > .materializeAs(someName)) > > ktable.join(new TableJoinBuilder(otherTable, valueJoiner) > .left() > .materializeAs(join-store) > .withCaching() > .withLogging(logConfig)); > > etc. > > We could then deprecate all existing methods on KTable and eventually > remove them, such that we have no overloaded methods. > Additionally, using the builder pattern gives us the opportunity to > provide a couple of other things people have asked for, i.e., control over > how the change-log etc topics are created and per-store caching. There are > probably other things, too. > > > > > > > On Mon, 30 Jan 2017 at 08:15 Matthias J. Sax <matth...@confluent.io> wrote: > >> cc from user list >> >> >> -------- Forwarded Message -------- >> Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved >> semantics >> Date: Mon, 30 Jan 2017 00:06:37 -0800 >> From: Matthias J. Sax <matth...@confluent.io> >> Organization: Confluent Inc >> To: us...@kafka.apache.org >> >> I understand point (1) about when materialization happens. But I cannot >> follow your conclusion about how this should influence the DSL because I >> don't see a functional difference in "provide a store name in a >> overload" vs "call .materialize()" -- both mechanism can do the exact >> some thing. >> >> I also do not understand, why we need to force users to specify a store >> name for using IQ. Even if store names are used internally, we can >> completely abstract this away from users. >> >> To me, the question about DSL design should be reduced to what a >> developer cares about. And most likely, she does not care about >> internals -- if one wants to query a KTable the usage of the "name" is >> an unnecessary detour in the thought process. >> >> Currently, the code would be something like this: >> >>> KStreamsBuilder builder = ... >>> KTable<String,Long> table = ... // requires to specify "storeName" >>> KafkaStreams streams = ... >>> >>> ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, >> QueryableStoreTypes.<String, Long>keyValueStore()); >>> String key = "some-key"; >>> Long someValue = localStore.get(key); >> >> but I think we can simply this to >> >>> KStreamsBuilder builder = ... >>> KTable<String,Long> table = ... // no "storeName" required -- can be >> generated internally (if user cares, use can optionally provide storeName) >>> KafkaStreams streams = ... >>> >>> String key = "some-key"; >>> Long someValue = table.get(key); >> >> If course, the call to table.get() is only valid if the store can be >> queried. But the same holds for the current approach -- if >> stream.store() gets a wrong storeName or wrong QueryableStoreTypes it >> will not find a store and fail -- thus we can also fail if a >> non-queryable KTable gets queries. Furthermore, the user does not need >> to worry about the potentially confusing QueryableStoreTypes parameter >> (if a KTable gets queried the type is fixed anyway). >> >> Thus, we don't need to force a user to specify a store name. >> Furthermore, we can add all kind of handy methods for the user like: >> >> KTable#withStoreName(String storeName) >> KTable#isQueryable() >> KTable#getStoreName() // if somebody has DIY IQ code >> >> and if we want to allow to force materialization >> >> KTable#materialize() >> KTable#materialize(String storeName) >> >> If a different JIRA there is the idea to allow people to provide >> changelog configs. Than could also easily be handled with new methods >> (instead of more overloads). >> >> KTable#withChangelogConfig(Map config) >> >> >> >> The main point I want to bring up is, that this is a two-folded >> discussion: (1) DSL design itself and (2) KTable materialization and IQ >> strategy. And both seem to be independent. Nevertheless, because (2) >> might require API changes, we should discuss both together to avoid two >> steps of API changes. >> >> >> -Matthias >> >> >> >> On 1/29/17 7:56 PM, Guozhang Wang wrote: >>> Thinking loud here about the API options (materialize v.s. overloaded >>> functions) and its impact on IQ: >>> >>> 1. The first issue of the current DSL is that, there is inconsistency >> upon >>> whether / how KTables should be materialized: >>> >>> a) in many cases the library HAS TO materialize KTables no matter >> what, >>> e.g. KStream / KTable aggregation resulted KTables, and hence we enforce >>> users to provide store names and throw RTE if it is null; >>> b) in some other cases, the KTable can be materialized or not; for >>> example in KStreamBuilder.table(), store names can be nullable and in >> which >>> case the KTable would not be materialized; >>> c) in some other cases, the KTable will never be materialized, for >>> example KTable.filter() resulted KTables, and users have no options to >>> enforce them to be materialized; >>> d) this is related to a), where some KTables are required to be >>> materialized, but we do not enforce users to provide a state store name, >>> e.g. KTables involved in joins; a RTE will be thrown not immediately but >>> later in this case. >>> >>> 2. The second issue is related to IQ, where state stores are accessed by >>> their state stores; so only those KTable's that have user-specified state >>> stores will be queryable. But because of 1) above, many stores may not be >>> interested to users for IQ but they still need to provide a (dummy?) >> state >>> store name for them; while on the other hand users cannot query some >> state >>> stores, e.g. the ones generated by KTable.filter() as there is no APIs >> for >>> them to specify a state store name. >>> >>> 3. We are aware from user feedbacks that such backend details would be >>> better be abstracted away from the DSL layer, where app developers should >>> just focus on processing logic, while state stores along with their >>> changelogs etc would better be in a different mechanism; same arguments >>> have been discussed for serdes / windowing triggers as well. For serdes >>> specifically, we had a very long discussion about it and concluded that, >> at >>> least in Java7, we cannot completely abstract serde away in the DSL, so >> we >>> choose the other extreme to enforce users to be completely aware of the >>> serde requirements when some KTables may need to be materialized vis >>> overloaded API functions. While for the state store names, I feel it is a >>> different argument than serdes (details below). >>> >>> >>> So to me, for either materialize() v.s. overloaded functions directions, >>> the first thing I'd like to resolve is the inconsistency issue mentioned >>> above. So in either case: KTable materialization will not be affect by >> user >>> providing state store name or not, but will only be decided by the >> library >>> when it is necessary. More specifically, only join operator and >>> builder.table() resulted KTables are not always materialized, but are >> still >>> likely to be materialized lazily (e.g. when participated in a join >>> operator). >>> >>> >>> For overloaded functions that would mean: >>> >>> a) we have an overloaded function for ALL operators that could result >>> in a KTable, and allow it to be null (i.e. for the function without this >>> param it is null by default); >>> b) null-state-store-name do not indicate that a KTable would not be >>> materialized, but that it will not be used for IQ at all (internal state >>> store names will be generated when necessary). >>> >>> >>> For materialize() that would mean: >>> >>> a) we will remove state store names from ALL operators that could >>> result in a KTable. >>> b) KTables that not calling materialized do not indicate that a >> KTable >>> would not be materialized, but that it will not be used for IQ at all >>> (internal state store names will be generated when necessary). >>> >>> >>> Again, in either ways the API itself does not "hint" about anything for >>> materializing a KTable or not at all; it is still purely determined by >> the >>> library when parsing the DSL for now. >>> >>> Following these thoughts, I feel that 1) we should probably change the >> name >>> "materialize" since it may be misleading to users as what actually >> happened >>> behind the scene, to e.g. Damian suggested "queryableStore(String >> storeName)", >>> which returns a QueryableStateStore, and can replace the >>> `KafkaStreams.store` function; 2) comparing those two options assuming we >>> get rid of the misleading function name, I personally favor not adding >> more >>> overloading functions as it keeps the API simpler. >>> >>> >>> >>> Guozhang >>> >>> >>> On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>>> Hi, >>>> >>>> thanks for your mail, felt like this can clarify some things! The thread >>>> unfortunately split but as all branches close in on what my suggestion >> was >>>> about Ill pick this to continue >>>> >>>> Of course only the table the user wants to query would be materialized. >>>> (retrieving the queryhandle implies materialisation). So In the example >> of >>>> KTable::filter if you call >>>> getIQHandle on both tables only the one source that is there would >>>> materialize and the QueryHandleabstraction would make sure it gets >> mapped >>>> and filtered and what not uppon read as usual. >>>> >>>> Of Course the Object you would retrieve would maybe only wrap the >>>> storeName / table unique identifier and a way to access the streams >>>> instance and then basically uses the same mechanism that is currently >> used. >>>> From my point of view this is the least confusing way for DSL users. If >>>> its to tricky to get a hand on the streams instance one could ask the >> user >>>> to pass it in before executing queries, therefore making sure the >> streams >>>> instance has been build. >>>> >>>> The effort to implement this is indeed some orders of magnitude higher >>>> than the overloaded materialized call. As long as I could help getting a >>>> different view I am happy. >>>> >>>> Best Jan >>>> >>>> >>>> On 28.01.2017 09:36, Eno Thereska wrote: >>>> >>>>> Hi Jan, >>>>> >>>>> I understand your concern. One implication of not passing any store >> name >>>>> and just getting an IQ handle is that all KTables would need to be >>>>> materialised. Currently the store name (or proposed .materialize() >> call) >>>>> act as hints on whether to materialise the KTable or not. Materialising >>>>> every KTable can be expensive, although there are some tricks one can >> play, >>>>> e.g., have a virtual store rather than one backed by a Kafka topic. >>>>> >>>>> However, even with the above, after getting an IQ handle, the user >> would >>>>> still need to use IQ APIs to query the state. As such, we would still >>>>> continue to be outside the original DSL so this wouldn't address your >>>>> original concern. >>>>> >>>>> So I read this suggestion as simplifying the APIs by removing the store >>>>> name, at the cost of having to materialise every KTable. It's >> definitely an >>>>> option we'll consider as part of this KIP. >>>>> >>>>> Thanks >>>>> Eno >>>>> >>>>> >>>>> On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >>>>>> >>>>>> Hi Exactly >>>>>> >>>>>> I know it works from the Processor API, but my suggestion would >> prevent >>>>>> DSL users dealing with storenames what so ever. >>>>>> >>>>>> In general I am pro switching between DSL and Processor API easily. >> (In >>>>>> my Stream applications I do this a lot with reflection and >> instanciating >>>>>> KTableImpl) Concerning this KIP all I say is that there should be a >> DSL >>>>>> concept of "I want to expose this __KTable__. This can be a Method >> like >>>>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table would >> know >>>>>> to materialize, and the user had a reference to the "store and the >>>>>> distributed query mechanism by the Interactive Query Handle" under >> the hood >>>>>> it can use the same mechanism as the PIP people again. >>>>>> >>>>>> I hope you see my point J >>>>>> >>>>>> Best Jan >>>>>> >>>>>> >>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 27.01.2017 21:59, Matthias J. Sax wrote: >>>>>> >>>>>>> Jan, >>>>>>> >>>>>>> the IQ feature is not limited to Streams DSL but can also be used for >>>>>>> Stores used in PAPI. Thus, we need a mechanism that does work for >> PAPI >>>>>>> and DSL. >>>>>>> >>>>>>> Nevertheless I see your point and I think we could provide a better >> API >>>>>>> for KTable stores including the discovery of remote shards of the >> same >>>>>>> KTable. >>>>>>> >>>>>>> @Michael: Yes, right now we do have a lot of overloads and I am not a >>>>>>> big fan of those -- I would rather prefer a builder pattern. But that >>>>>>> might be a different discussion (nevertheless, if we would aim for a >> API >>>>>>> rework, we should get the changes with regard to stores right from >> the >>>>>>> beginning on, in order to avoid a redesign later on.) >>>>>>> >>>>>>> something like: >>>>>>> >>>>>>> stream.groupyByKey() >>>>>>> .window(TimeWindow.of(5000)) >>>>>>> .aggregate(...) >>>>>>> .withAggValueSerde(new CustomTypeSerde()) >>>>>>> .withStoreName("storeName); >>>>>>> >>>>>>> >>>>>>> (This would also reduce JavaDoc redundancy -- maybe a personal pain >>>>>>> point right now :)) >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote: >>>>>>> >>>>>>>> Yeah, >>>>>>>> >>>>>>>> Maybe my bad that I refuse to look into IQ as i don't find them >>>>>>>> anywhere >>>>>>>> close to being interesting. The Problem IMO is that people need to >> know >>>>>>>> the Store name), so we are working on different levels to achieve a >>>>>>>> single goal. >>>>>>>> >>>>>>>> What is your peoples opinion on having a method on KTABLE that >> returns >>>>>>>> them something like a Keyvalue store. There is of course problems >> like >>>>>>>> "it cant be used before the streamthreads are going and >> groupmembership >>>>>>>> is established..." but the benefit would be that for the user there >> is >>>>>>>> a >>>>>>>> consistent way of saying "Hey I need it materialized as querries >> gonna >>>>>>>> be comming" + already get a Thing that he can execute the querries >> on >>>>>>>> in >>>>>>>> 1 step. >>>>>>>> What I think is unintuitive here is you need to say materialize on >> this >>>>>>>> Ktable and then you go somewhere else and find its store name and >> then >>>>>>>> you go to the kafkastreams instance and ask for the store with this >>>>>>>> name. >>>>>>>> >>>>>>>> So one could the user help to stay in DSL land and therefore maybe >>>>>>>> confuse him less. >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> #DeathToIQMoreAndBetterConnectors :) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 27.01.2017 16:51, Damian Guy wrote: >>>>>>>> >>>>>>>>> I think Jan is saying that they don't always need to be >> materialized, >>>>>>>>> i.e., >>>>>>>>> filter just needs to apply the ValueGetter, it doesn't need yet >>>>>>>>> another >>>>>>>>> physical state store. >>>>>>>>> >>>>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Like Damian, and for the same reasons, I am more in favor of >>>>>>>>>> overloading >>>>>>>>>> methods rather than introducing `materialize()`. >>>>>>>>>> FWIW, we already have a similar API setup for e.g. >>>>>>>>>> `KTable#through(topicName, stateStoreName)`. >>>>>>>>>> >>>>>>>>>> A related but slightly different question is what e.g. Jan >> Filipiak >>>>>>>>>> mentioned earlier in this thread: >>>>>>>>>> I think we need to explain more clearly why KIP-114 doesn't >> propose >>>>>>>>>> the >>>>>>>>>> seemingly simpler solution of always materializing tables/state >>>>>>>>>> stores. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak < >>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If you >> uses >>>>>>>>>>> the >>>>>>>>>>> ValueGetter of Filter it will apply the filter and should be >>>>>>>>>>> completely >>>>>>>>>>> transparent as to if another processor or IQ is accessing it? How >>>>>>>>>>> can >>>>>>>>>>> >>>>>>>>>> this >>>>>>>>>> >>>>>>>>>>> new method help? >>>>>>>>>>> >>>>>>>>>>> I cannot see the reason for the additional materialize method >> being >>>>>>>>>>> required! Hence I suggest leave it alone. >>>>>>>>>>> regarding removing the others I dont have strong opinions and it >>>>>>>>>>> seems to >>>>>>>>>>> be unrelated. >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote: >>>>>>>>>>> >>>>>>>>>>> Forwarding this thread to the users list too in case people would >>>>>>>>>>>> like >>>>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> comment. It is also on the dev list. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Eno >>>>>>>>>>>> >>>>>>>>>>>> Begin forwarded message: >>>>>>>>>>>> >>>>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io> >>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and >>>>>>>>>>>>> improved >>>>>>>>>>>>> semantics >>>>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT >>>>>>>>>>>>> To: dev@kafka.apache.org >>>>>>>>>>>>> Reply-To: dev@kafka.apache.org >>>>>>>>>>>>> >>>>>>>>>>>>> That not what I meant by "huge impact". >>>>>>>>>>>>> >>>>>>>>>>>>> I refer to the actions related to materialize a KTable: >> creating a >>>>>>>>>>>>> RocksDB store and a changelog topic -- users should be aware >> about >>>>>>>>>>>>> runtime implication and this is better expressed by an explicit >>>>>>>>>>>>> method >>>>>>>>>>>>> call, rather than implicitly triggered by using a different >>>>>>>>>>>>> overload of >>>>>>>>>>>>> a method. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I think your definition of a huge impact and mine are rather >>>>>>>>>>>>>> different >>>>>>>>>>>>>> ;-P >>>>>>>>>>>>>> Overloading a few methods is not really a huge impact IMO. >> It is >>>>>>>>>>>>>> >>>>>>>>>>>>> also a >>>>>>>>>> >>>>>>>>>>> sacrifice worth making for readability, usability of the API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand your argument, but do not agree with it. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Your first version (even if the "flow" is not as nice) is >> more >>>>>>>>>>>>>>> >>>>>>>>>>>>>> explicit >>>>>>>>>> >>>>>>>>>>> than the second version. Adding a stateStoreName parameter is >> quite >>>>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the rather >> more >>>>>>>>>>>>>>> verbose >>>>>>>>>>>>>>> but explicit version. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts the flow, >>>>>>>>>>>>>>>> i.e, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> table.mapValue(..).materialize().join(..).materialize() >>>>>>>>>>>>>>>> compared to: >>>>>>>>>>>>>>>> table.mapValues(..).join(..) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I know which one i prefer. >>>>>>>>>>>>>>>> My preference is stil to provide overloaded methods where >>>>>>>>>>>>>>>> people can >>>>>>>>>>>>>>>> specify the store names if they want, otherwise we just >>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> them. >>>>>>>>>> >>>>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax >>>>>>>>>>>>>>>> <matth...@confluent.io >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store name >> from >>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> KTable >>>>>>>>>>>>>>>>> methods and generate internal names (however, I would do >> this >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> overloads). Furthermore, I would not force users to call >>>>>>>>>>>>>>>>> .materialize() >>>>>>>>>>>>>>>>> if they want to query a store, but add one more method >>>>>>>>>>>>>>>>> .stateStoreName() >>>>>>>>>>>>>>>>> that returns the store name if the KTable is materialized. >>>>>>>>>>>>>>>>> Thus, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> also >>>>>>>>>> >>>>>>>>>>> .materialize() must not necessarily have a parameter storeName >>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> should have some overloads here). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would also not allow to provide a null store name (to >>>>>>>>>>>>>>>>> indicate no >>>>>>>>>>>>>>>>> materialization if not necessary) but throw an exception. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This yields some simplification (see below). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about KStream#toTable() >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on KTable >>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >>>>>>>>>>>>>>>>>> (providing >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is no need >> to >>>>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> a second materialization and also no exception must be >>>>>>>>>>>>>>>>> throws. A >>>>>>>>>>>>>>>>> call to >>>>>>>>>>>>>>>>> .materialize() basically sets a "materialized flag" (ie, >>>>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>>>> operation) and sets a new name. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 4) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use >>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, >> for >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> example, >>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Eno's reply: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it absolutely >>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> we are converting it to. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder >> methods >>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would keep #toStream(). (see below) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 5) We should not remove any methods but only deprecate >> them. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> A general note: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I do not understand your comments "Rejected Alternatives". >> You >>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>> "Have >>>>>>>>>>>>>>>>> the KTable be the materialized view" was rejected. But your >>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of KTable is >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> secondary >>>>>>>>>> >>>>>>>>>>> after those changes and the "view" abstraction is what a >>>>>>>>>>>>>>>>> KTable is. >>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>> just to be clear, I like this a lot: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - it aligns with the name KTable >>>>>>>>>>>>>>>>> - is aligns with stream-table-duality >>>>>>>>>>>>>>>>> - it aligns with IQ >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" (as >>>>>>>>>>>>>>>>> materialization is >>>>>>>>>>>>>>>>> optional). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments and a >> few >>>>>>>>>>>>>>>>>> detailed >>>>>>>>>>>>>>>>>> comments: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. I like the materialize() function in general, but I >> would >>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> see >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> how other KTable functions should be updated accordingly. >> For >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> example, >>>>>>>>>> >>>>>>>>>>> 1) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name parameter, >> and >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> will >>>>>>>>>> >>>>>>>>>>> always materialize the KTable unless its state store name is set >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> null; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be materialized, >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> hence >>>>>>>>>> >>>>>>>>>>> it >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> also have a state store name; 3) KTable.join requires the >>>>>>>>>>>>>>>> joining >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be materialized. And today we do not actually have a >>>>>>>>>>>>>>>>>> mechanism to >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that, but will only throw an exception at runtime if it is >> not >>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE will be >>>>>>>>>>>>>>>>>> thrown). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the >> discussion >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> here: >>>>>>>>>> >>>>>>>>>>> let's >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> remove all the state store params in other KTable functions, >>>>>>>>>>>>>>>> and if >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> in >>>>>>>>>> >>>>>>>>>>> some >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> KXX.agg) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and users do not call materialize(), then we treat it as >> "users >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> interested in querying it at all" and hence use an >> internal >>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> generated >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is >> materialized >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>> store is not exposed to users. And if users call >>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> afterwards >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> but we have already decided to materialize it, we can >> replace >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> internal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> name with the user's provided names. Then from a user's >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> point-view, >>>>>>>>>> >>>>>>>>>>> if >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ever want to query a KTable, they have to call >> materialize() >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a >>>>>>>>>> >>>>>>>>>>> given >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> state store name. This approach has one awkwardness though, >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state store names param are not separated and could be >>>>>>>>>>>>>>>>>> overlapped >>>>>>>>>>>>>>>>>> (see >>>>>>>>>>>>>>>>>> detailed comment #2 below). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2. This step does not need to be included in this KIP, but >>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> as a >>>>>>>>>> >>>>>>>>>>> reference / future work: as we have discussed before, we may >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> enforce >>>>>>>>>> >>>>>>>>>>> materialize KTable.join resulted KTables as well in the >>>>>>>>>>>>>>>>>> future. If >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that, then: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always materialized; >>>>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to always be >>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>> (otherwise we would not know the old value); >>>>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always materialized, >> and >>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> are >>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>>>>>>>>> joining KTables to always be materialized. >>>>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables >> materialization >>>>>>>>>>>>>>>>>> depend >>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> parent's materialization; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> By recursive induction all KTables are actually always >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> materialized, >>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> then the effect of the "materialize()" is just for >> specifying >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>> store names. In this scenario, we do not need to send >>>>>>>>>>>>>>>>>> Change<V> in >>>>>>>>>>>>>>>>>> repartition topics within joins any more, but only for >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> repartitions >>>>>>>>>> >>>>>>>>>>> topics >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> within aggregations. Instead, we can just send a >> "tombstone" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> without >>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> old value and we do not need to calculate joins twice (one >> more >>>>>>>>>>>>>>>> time >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> old value is received). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a >>>>>>>>>>>>>>>>>> "KStream#toTable()" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> function >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where the new >>>>>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of use cases >> of >>>>>>>>>>>>>>>>>> this, >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> example, users want to read a changelog topic, apply some >>>>>>>>>>>>>>>>>> filters, >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> materialize it into a KTable with state stores without >>>>>>>>>>>>>>>>>> creating >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> duplicated >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> changelog topics. With materialize() and toTable I'd >> imagine >>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> specify sth. like: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>> KStream stream = builder.stream("topic1").filter(..); >>>>>>>>>>>>>>>>>> KTable table = stream.toTable(..); >>>>>>>>>>>>>>>>>> table.materialize("state1"); >>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> And the library in this case could set store "state1" 's >>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly while >>>>>>>>>>>>>>>>>> (re-)storing >>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>> state by reading from this topic, instead of creating a >>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a >> semi-duplicate >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> "topic1". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Detailed: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I >> was >>>>>>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> renaming to "#toChangeLog" but after thinking a bit more I >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> #toStream >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> is still better, and we can just mention in the javaDoc >> that >>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>> transforming its underlying changelog stream to a normal >>>>>>>>>>>>>>>>>> stream. >>>>>>>>>>>>>>>>>> 2. As Damian mentioned, there are a few scenarios where >> the >>>>>>>>>>>>>>>>>> serdes >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> already specified in a previous operation whereas it is >> not >>>>>>>>>>>>>>>>>> known >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> calling materialize, for example: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> table.mapValues(/*no >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> serde specified*/).materialize(serde). We need to specify >>>>>>>>>>>>>>>> what are >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> handling logic here. >>>>>>>>>>>>>>>>>> 3. We can remove "KTable#to" call as well, and enforce >> users >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> call " >>>>>>>>>>>>>>>>>> KTable.toStream.to" to be more clear. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska < >>>>>>>>>>>>>>>>>> eno.there...@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it >> absolutely >>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> are converting it to. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder >> methods >>>>>>>>>>>>>>>>>>> (but >>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this KIP). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll < >>>>>>>>>>>>>>>>>>> mich...@confluent.io> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use >>>>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, >> for >>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> don't care about the "K" prefix. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks Damian, answers inline: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy < >>>>>>>>>>>>>>>>>>>>> damian....@gmail.com> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Eno, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Ok. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2. I don't think the addition of the new Log compaction >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> mechanism >>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful without >> it. >>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> should be a different KIP? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for that. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on KTable >>>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore >> (providing >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> name >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> different), throw an Exception? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 4. Have you considered overloading the existing KTable >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> operations >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a state store name? So if a state store name is provided, >>>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> materialize >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> a state store? This would be my preferred approach as i >>>>>>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> materialize is always a valid operation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ok I can see your point. This will increase the KIP size >>>>>>>>>>>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>>>> I'll >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> to enumerate all overloaded methods, but it's not a >> problem. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 5. The materialize method will need ta value Serde as >> some >>>>>>>>>>>>>>>>>>>>> operations, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - >> might >>>>>>>>>>>>>>>>>>>>>> mean >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> always need to materialize the StateStore for >> KTable-KTable >>>>>>>>>>>>>>>>>>>>>> joins. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> is the case, then the KTable Join operators will also >> need >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Serde >>>>>>>>>> >>>>>>>>>>> information. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'll update the KIP with the serdes. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska < >>>>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> We created "KIP-114: KTable materialization and >> improved >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> semantics" >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>>>>>>>>>>>>> Your feedback is appreciated. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>> >>> >>> >> >> >> >>