How about something a bit different? We could pass builders to all the KTable methods. So we have something like this: ktable.filter(new FilteredTableBuilder(predicate) .materializeAs(someName))
ktable.join(new TableJoinBuilder(otherTable, valueJoiner) .left() .materializeAs(join-store) .withCaching() .withLogging(logConfig)); etc. We could then deprecate all existing methods on KTable and eventually remove them, such that we have no overloaded methods. Additionally, using the builder pattern gives us the opportunity to provide a couple of other things people have asked for, i.e., control over how the change-log etc topics are created and per-store caching. There are probably other things, too. On Mon, 30 Jan 2017 at 08:15 Matthias J. Sax <matth...@confluent.io> wrote: > cc from user list > > > -------- Forwarded Message -------- > Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved > semantics > Date: Mon, 30 Jan 2017 00:06:37 -0800 > From: Matthias J. Sax <matth...@confluent.io> > Organization: Confluent Inc > To: us...@kafka.apache.org > > I understand point (1) about when materialization happens. But I cannot > follow your conclusion about how this should influence the DSL because I > don't see a functional difference in "provide a store name in a > overload" vs "call .materialize()" -- both mechanism can do the exact > some thing. > > I also do not understand, why we need to force users to specify a store > name for using IQ. Even if store names are used internally, we can > completely abstract this away from users. > > To me, the question about DSL design should be reduced to what a > developer cares about. And most likely, she does not care about > internals -- if one wants to query a KTable the usage of the "name" is > an unnecessary detour in the thought process. > > Currently, the code would be something like this: > > > KStreamsBuilder builder = ... > > KTable<String,Long> table = ... // requires to specify "storeName" > > KafkaStreams streams = ... > > > > ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, > QueryableStoreTypes.<String, Long>keyValueStore()); > > String key = "some-key"; > > Long someValue = localStore.get(key); > > but I think we can simply this to > > > KStreamsBuilder builder = ... > > KTable<String,Long> table = ... // no "storeName" required -- can be > generated internally (if user cares, use can optionally provide storeName) > > KafkaStreams streams = ... > > > > String key = "some-key"; > > Long someValue = table.get(key); > > If course, the call to table.get() is only valid if the store can be > queried. But the same holds for the current approach -- if > stream.store() gets a wrong storeName or wrong QueryableStoreTypes it > will not find a store and fail -- thus we can also fail if a > non-queryable KTable gets queries. Furthermore, the user does not need > to worry about the potentially confusing QueryableStoreTypes parameter > (if a KTable gets queried the type is fixed anyway). > > Thus, we don't need to force a user to specify a store name. > Furthermore, we can add all kind of handy methods for the user like: > > KTable#withStoreName(String storeName) > KTable#isQueryable() > KTable#getStoreName() // if somebody has DIY IQ code > > and if we want to allow to force materialization > > KTable#materialize() > KTable#materialize(String storeName) > > If a different JIRA there is the idea to allow people to provide > changelog configs. Than could also easily be handled with new methods > (instead of more overloads). > > KTable#withChangelogConfig(Map config) > > > > The main point I want to bring up is, that this is a two-folded > discussion: (1) DSL design itself and (2) KTable materialization and IQ > strategy. And both seem to be independent. Nevertheless, because (2) > might require API changes, we should discuss both together to avoid two > steps of API changes. > > > -Matthias > > > > On 1/29/17 7:56 PM, Guozhang Wang wrote: > > Thinking loud here about the API options (materialize v.s. overloaded > > functions) and its impact on IQ: > > > > 1. The first issue of the current DSL is that, there is inconsistency > upon > > whether / how KTables should be materialized: > > > > a) in many cases the library HAS TO materialize KTables no matter > what, > > e.g. KStream / KTable aggregation resulted KTables, and hence we enforce > > users to provide store names and throw RTE if it is null; > > b) in some other cases, the KTable can be materialized or not; for > > example in KStreamBuilder.table(), store names can be nullable and in > which > > case the KTable would not be materialized; > > c) in some other cases, the KTable will never be materialized, for > > example KTable.filter() resulted KTables, and users have no options to > > enforce them to be materialized; > > d) this is related to a), where some KTables are required to be > > materialized, but we do not enforce users to provide a state store name, > > e.g. KTables involved in joins; a RTE will be thrown not immediately but > > later in this case. > > > > 2. The second issue is related to IQ, where state stores are accessed by > > their state stores; so only those KTable's that have user-specified state > > stores will be queryable. But because of 1) above, many stores may not be > > interested to users for IQ but they still need to provide a (dummy?) > state > > store name for them; while on the other hand users cannot query some > state > > stores, e.g. the ones generated by KTable.filter() as there is no APIs > for > > them to specify a state store name. > > > > 3. We are aware from user feedbacks that such backend details would be > > better be abstracted away from the DSL layer, where app developers should > > just focus on processing logic, while state stores along with their > > changelogs etc would better be in a different mechanism; same arguments > > have been discussed for serdes / windowing triggers as well. For serdes > > specifically, we had a very long discussion about it and concluded that, > at > > least in Java7, we cannot completely abstract serde away in the DSL, so > we > > choose the other extreme to enforce users to be completely aware of the > > serde requirements when some KTables may need to be materialized vis > > overloaded API functions. While for the state store names, I feel it is a > > different argument than serdes (details below). > > > > > > So to me, for either materialize() v.s. overloaded functions directions, > > the first thing I'd like to resolve is the inconsistency issue mentioned > > above. So in either case: KTable materialization will not be affect by > user > > providing state store name or not, but will only be decided by the > library > > when it is necessary. More specifically, only join operator and > > builder.table() resulted KTables are not always materialized, but are > still > > likely to be materialized lazily (e.g. when participated in a join > > operator). > > > > > > For overloaded functions that would mean: > > > > a) we have an overloaded function for ALL operators that could result > > in a KTable, and allow it to be null (i.e. for the function without this > > param it is null by default); > > b) null-state-store-name do not indicate that a KTable would not be > > materialized, but that it will not be used for IQ at all (internal state > > store names will be generated when necessary). > > > > > > For materialize() that would mean: > > > > a) we will remove state store names from ALL operators that could > > result in a KTable. > > b) KTables that not calling materialized do not indicate that a > KTable > > would not be materialized, but that it will not be used for IQ at all > > (internal state store names will be generated when necessary). > > > > > > Again, in either ways the API itself does not "hint" about anything for > > materializing a KTable or not at all; it is still purely determined by > the > > library when parsing the DSL for now. > > > > Following these thoughts, I feel that 1) we should probably change the > name > > "materialize" since it may be misleading to users as what actually > happened > > behind the scene, to e.g. Damian suggested "queryableStore(String > storeName)", > > which returns a QueryableStateStore, and can replace the > > `KafkaStreams.store` function; 2) comparing those two options assuming we > > get rid of the misleading function name, I personally favor not adding > more > > overloading functions as it keeps the API simpler. > > > > > > > > Guozhang > > > > > > On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak <jan.filip...@trivago.com> > > wrote: > > > >> Hi, > >> > >> thanks for your mail, felt like this can clarify some things! The thread > >> unfortunately split but as all branches close in on what my suggestion > was > >> about Ill pick this to continue > >> > >> Of course only the table the user wants to query would be materialized. > >> (retrieving the queryhandle implies materialisation). So In the example > of > >> KTable::filter if you call > >> getIQHandle on both tables only the one source that is there would > >> materialize and the QueryHandleabstraction would make sure it gets > mapped > >> and filtered and what not uppon read as usual. > >> > >> Of Course the Object you would retrieve would maybe only wrap the > >> storeName / table unique identifier and a way to access the streams > >> instance and then basically uses the same mechanism that is currently > used. > >> From my point of view this is the least confusing way for DSL users. If > >> its to tricky to get a hand on the streams instance one could ask the > user > >> to pass it in before executing queries, therefore making sure the > streams > >> instance has been build. > >> > >> The effort to implement this is indeed some orders of magnitude higher > >> than the overloaded materialized call. As long as I could help getting a > >> different view I am happy. > >> > >> Best Jan > >> > >> > >> On 28.01.2017 09:36, Eno Thereska wrote: > >> > >>> Hi Jan, > >>> > >>> I understand your concern. One implication of not passing any store > name > >>> and just getting an IQ handle is that all KTables would need to be > >>> materialised. Currently the store name (or proposed .materialize() > call) > >>> act as hints on whether to materialise the KTable or not. Materialising > >>> every KTable can be expensive, although there are some tricks one can > play, > >>> e.g., have a virtual store rather than one backed by a Kafka topic. > >>> > >>> However, even with the above, after getting an IQ handle, the user > would > >>> still need to use IQ APIs to query the state. As such, we would still > >>> continue to be outside the original DSL so this wouldn't address your > >>> original concern. > >>> > >>> So I read this suggestion as simplifying the APIs by removing the store > >>> name, at the cost of having to materialise every KTable. It's > definitely an > >>> option we'll consider as part of this KIP. > >>> > >>> Thanks > >>> Eno > >>> > >>> > >>> On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com> > wrote: > >>>> > >>>> Hi Exactly > >>>> > >>>> I know it works from the Processor API, but my suggestion would > prevent > >>>> DSL users dealing with storenames what so ever. > >>>> > >>>> In general I am pro switching between DSL and Processor API easily. > (In > >>>> my Stream applications I do this a lot with reflection and > instanciating > >>>> KTableImpl) Concerning this KIP all I say is that there should be a > DSL > >>>> concept of "I want to expose this __KTable__. This can be a Method > like > >>>> KTable::retrieveIQHandle():InteractiveQueryHandle, the table would > know > >>>> to materialize, and the user had a reference to the "store and the > >>>> distributed query mechanism by the Interactive Query Handle" under > the hood > >>>> it can use the same mechanism as the PIP people again. > >>>> > >>>> I hope you see my point J > >>>> > >>>> Best Jan > >>>> > >>>> > >>>> #DeathToIQMoreAndBetterConnectors :) > >>>> > >>>> > >>>> > >>>> > >>>> On 27.01.2017 21:59, Matthias J. Sax wrote: > >>>> > >>>>> Jan, > >>>>> > >>>>> the IQ feature is not limited to Streams DSL but can also be used for > >>>>> Stores used in PAPI. Thus, we need a mechanism that does work for > PAPI > >>>>> and DSL. > >>>>> > >>>>> Nevertheless I see your point and I think we could provide a better > API > >>>>> for KTable stores including the discovery of remote shards of the > same > >>>>> KTable. > >>>>> > >>>>> @Michael: Yes, right now we do have a lot of overloads and I am not a > >>>>> big fan of those -- I would rather prefer a builder pattern. But that > >>>>> might be a different discussion (nevertheless, if we would aim for a > API > >>>>> rework, we should get the changes with regard to stores right from > the > >>>>> beginning on, in order to avoid a redesign later on.) > >>>>> > >>>>> something like: > >>>>> > >>>>> stream.groupyByKey() > >>>>> .window(TimeWindow.of(5000)) > >>>>> .aggregate(...) > >>>>> .withAggValueSerde(new CustomTypeSerde()) > >>>>> .withStoreName("storeName); > >>>>> > >>>>> > >>>>> (This would also reduce JavaDoc redundancy -- maybe a personal pain > >>>>> point right now :)) > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 1/27/17 11:10 AM, Jan Filipiak wrote: > >>>>> > >>>>>> Yeah, > >>>>>> > >>>>>> Maybe my bad that I refuse to look into IQ as i don't find them > >>>>>> anywhere > >>>>>> close to being interesting. The Problem IMO is that people need to > know > >>>>>> the Store name), so we are working on different levels to achieve a > >>>>>> single goal. > >>>>>> > >>>>>> What is your peoples opinion on having a method on KTABLE that > returns > >>>>>> them something like a Keyvalue store. There is of course problems > like > >>>>>> "it cant be used before the streamthreads are going and > groupmembership > >>>>>> is established..." but the benefit would be that for the user there > is > >>>>>> a > >>>>>> consistent way of saying "Hey I need it materialized as querries > gonna > >>>>>> be comming" + already get a Thing that he can execute the querries > on > >>>>>> in > >>>>>> 1 step. > >>>>>> What I think is unintuitive here is you need to say materialize on > this > >>>>>> Ktable and then you go somewhere else and find its store name and > then > >>>>>> you go to the kafkastreams instance and ask for the store with this > >>>>>> name. > >>>>>> > >>>>>> So one could the user help to stay in DSL land and therefore maybe > >>>>>> confuse him less. > >>>>>> > >>>>>> Best Jan > >>>>>> > >>>>>> #DeathToIQMoreAndBetterConnectors :) > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 27.01.2017 16:51, Damian Guy wrote: > >>>>>> > >>>>>>> I think Jan is saying that they don't always need to be > materialized, > >>>>>>> i.e., > >>>>>>> filter just needs to apply the ValueGetter, it doesn't need yet > >>>>>>> another > >>>>>>> physical state store. > >>>>>>> > >>>>>>> On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Like Damian, and for the same reasons, I am more in favor of > >>>>>>>> overloading > >>>>>>>> methods rather than introducing `materialize()`. > >>>>>>>> FWIW, we already have a similar API setup for e.g. > >>>>>>>> `KTable#through(topicName, stateStoreName)`. > >>>>>>>> > >>>>>>>> A related but slightly different question is what e.g. Jan > Filipiak > >>>>>>>> mentioned earlier in this thread: > >>>>>>>> I think we need to explain more clearly why KIP-114 doesn't > propose > >>>>>>>> the > >>>>>>>> seemingly simpler solution of always materializing tables/state > >>>>>>>> stores. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak < > >>>>>>>> jan.filip...@trivago.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Yeah its confusing, Why shoudn't it be querable by IQ? If you > uses > >>>>>>>>> the > >>>>>>>>> ValueGetter of Filter it will apply the filter and should be > >>>>>>>>> completely > >>>>>>>>> transparent as to if another processor or IQ is accessing it? How > >>>>>>>>> can > >>>>>>>>> > >>>>>>>> this > >>>>>>>> > >>>>>>>>> new method help? > >>>>>>>>> > >>>>>>>>> I cannot see the reason for the additional materialize method > being > >>>>>>>>> required! Hence I suggest leave it alone. > >>>>>>>>> regarding removing the others I dont have strong opinions and it > >>>>>>>>> seems to > >>>>>>>>> be unrelated. > >>>>>>>>> > >>>>>>>>> Best Jan > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 26.01.2017 20:48, Eno Thereska wrote: > >>>>>>>>> > >>>>>>>>> Forwarding this thread to the users list too in case people would > >>>>>>>>>> like > >>>>>>>>>> > >>>>>>>>> to > >>>>>>>> > >>>>>>>>> comment. It is also on the dev list. > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> Eno > >>>>>>>>>> > >>>>>>>>>> Begin forwarded message: > >>>>>>>>>> > >>>>>>>>>>> From: "Matthias J. Sax" <matth...@confluent.io> > >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and > >>>>>>>>>>> improved > >>>>>>>>>>> semantics > >>>>>>>>>>> Date: 24 January 2017 at 19:30:10 GMT > >>>>>>>>>>> To: dev@kafka.apache.org > >>>>>>>>>>> Reply-To: dev@kafka.apache.org > >>>>>>>>>>> > >>>>>>>>>>> That not what I meant by "huge impact". > >>>>>>>>>>> > >>>>>>>>>>> I refer to the actions related to materialize a KTable: > creating a > >>>>>>>>>>> RocksDB store and a changelog topic -- users should be aware > about > >>>>>>>>>>> runtime implication and this is better expressed by an explicit > >>>>>>>>>>> method > >>>>>>>>>>> call, rather than implicitly triggered by using a different > >>>>>>>>>>> overload of > >>>>>>>>>>> a method. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 1/24/17 1:35 AM, Damian Guy wrote: > >>>>>>>>>>> > >>>>>>>>>>> I think your definition of a huge impact and mine are rather > >>>>>>>>>>>> different > >>>>>>>>>>>> ;-P > >>>>>>>>>>>> Overloading a few methods is not really a huge impact IMO. > It is > >>>>>>>>>>>> > >>>>>>>>>>> also a > >>>>>>>> > >>>>>>>>> sacrifice worth making for readability, usability of the API. > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax < > >>>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> I understand your argument, but do not agree with it. > >>>>>>>>>>>> > >>>>>>>>>>>>> Your first version (even if the "flow" is not as nice) is > more > >>>>>>>>>>>>> > >>>>>>>>>>>> explicit > >>>>>>>> > >>>>>>>>> than the second version. Adding a stateStoreName parameter is > quite > >>>>>>>>>>>>> implicit but has a huge impact -- thus, I prefer the rather > more > >>>>>>>>>>>>> verbose > >>>>>>>>>>>>> but explicit version. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 1/23/17 1:39 AM, Damian Guy wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'm not a fan of materialize. I think it interrupts the flow, > >>>>>>>>>>>>>> i.e, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> table.mapValue(..).materialize().join(..).materialize() > >>>>>>>>>>>>>> compared to: > >>>>>>>>>>>>>> table.mapValues(..).join(..) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I know which one i prefer. > >>>>>>>>>>>>>> My preference is stil to provide overloaded methods where > >>>>>>>>>>>>>> people can > >>>>>>>>>>>>>> specify the store names if they want, otherwise we just > >>>>>>>>>>>>>> generate > >>>>>>>>>>>>>> > >>>>>>>>>>>>> them. > >>>>>>>> > >>>>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax > >>>>>>>>>>>>>> <matth...@confluent.io > >>>>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> thanks for the KIP Eno! Here are my 2 cents: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) I like Guozhang's proposal about removing store name > from > >>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>> KTable > >>>>>>>>>>>>>>> methods and generate internal names (however, I would do > this > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>> overloads). Furthermore, I would not force users to call > >>>>>>>>>>>>>>> .materialize() > >>>>>>>>>>>>>>> if they want to query a store, but add one more method > >>>>>>>>>>>>>>> .stateStoreName() > >>>>>>>>>>>>>>> that returns the store name if the KTable is materialized. > >>>>>>>>>>>>>>> Thus, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> also > >>>>>>>> > >>>>>>>>> .materialize() must not necessarily have a parameter storeName > >>>>>>>>>>>>>>> (ie, > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> should have some overloads here). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would also not allow to provide a null store name (to > >>>>>>>>>>>>>>> indicate no > >>>>>>>>>>>>>>> materialization if not necessary) but throw an exception. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This yields some simplification (see below). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) I also like Guozhang's proposal about KStream#toTable() > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. What will happen when you call materialize on KTable > >>>>>>>>>>>>>>>> that is > >>>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>> materialized? Will it create another StateStore > >>>>>>>>>>>>>>>> (providing > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> different), throw an Exception? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Currently an exception is thrown, but see below. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If we follow approach (1) from Guozhang, there is no need > to > >>>>>>>>>>>>>>>> worry > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>> a second materialization and also no exception must be > >>>>>>>>>>>>>>> throws. A > >>>>>>>>>>>>>>> call to > >>>>>>>>>>>>>>> .materialize() basically sets a "materialized flag" (ie, > >>>>>>>>>>>>>>> idempotent > >>>>>>>>>>>>>>> operation) and sets a new name. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 4) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. > >>>>>>>>>>>>>>>> Not sure whether that is really required. We also use > >>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, > for > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> example, > >>>>>>>> > >>>>>>>>> and > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> don't care about the "K" prefix. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Eno's reply: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think changing it to `toKStream` would make it absolutely > >>>>>>>>>>>>>>>> clear > >>>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> we are converting it to. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder > methods > >>>>>>>>>>>>>>>> (but > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> this KIP). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would keep #toStream(). (see below) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 5) We should not remove any methods but only deprecate > them. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> A general note: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I do not understand your comments "Rejected Alternatives". > You > >>>>>>>>>>>>>>> say > >>>>>>>>>>>>>>> "Have > >>>>>>>>>>>>>>> the KTable be the materialized view" was rejected. But your > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>> does exactly this -- the changelog abstraction of KTable is > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> secondary > >>>>>>>> > >>>>>>>>> after those changes and the "view" abstraction is what a > >>>>>>>>>>>>>>> KTable is. > >>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>> just to be clear, I like this a lot: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - it aligns with the name KTable > >>>>>>>>>>>>>>> - is aligns with stream-table-duality > >>>>>>>>>>>>>>> - it aligns with IQ > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would say that a KTable is a "view abstraction" (as > >>>>>>>>>>>>>>> materialization is > >>>>>>>>>>>>>>> optional). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the KIP Eno, I have a few meta comments and a > few > >>>>>>>>>>>>>>>> detailed > >>>>>>>>>>>>>>>> comments: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. I like the materialize() function in general, but I > would > >>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to > >>>>>>>> > >>>>>>>>> see > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> how other KTable functions should be updated accordingly. > For > >>>>>>>>>>>>>> > >>>>>>>>>>>>> example, > >>>>>>>> > >>>>>>>>> 1) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> KStreamBuilder.table(..) has a state store name parameter, > and > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>> > >>>>>>>>>>>>> will > >>>>>>>> > >>>>>>>>> always materialize the KTable unless its state store name is set > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to > >>>>>>>> > >>>>>>>>> null; > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2) KTable.agg requires the result KTable to be materialized, > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>> > >>>>>>>>>>>>> hence > >>>>>>>> > >>>>>>>>> it > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> also have a state store name; 3) KTable.join requires the > >>>>>>>>>>>>>> joining > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> be materialized. And today we do not actually have a > >>>>>>>>>>>>>>>> mechanism to > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> enforce > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> that, but will only throw an exception at runtime if it is > not > >>>>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> have "builder.table("topic", null).join()" a RTE will be > >>>>>>>>>>>>>>>> thrown). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'd make an extended proposal just to kick off the > discussion > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> here: > >>>>>>>> > >>>>>>>>> let's > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> remove all the state store params in other KTable functions, > >>>>>>>>>>>>>> and if > >>>>>>>>>>>>>> > >>>>>>>>>>>>> in > >>>>>>>> > >>>>>>>>> some > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted > >>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> KXX.agg) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> and users do not call materialize(), then we treat it as > "users > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>> interested in querying it at all" and hence use an > internal > >>>>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> generated > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> for the materialized KTable; i.e. although it is > materialized > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>> store is not exposed to users. And if users call > >>>>>>>>>>>>>>>> materialize() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> afterwards > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> but we have already decided to materialize it, we can > replace > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> internal > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> name with the user's provided names. Then from a user's > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> point-view, > >>>>>>>> > >>>>>>>>> if > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ever want to query a KTable, they have to call > materialize() > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a > >>>>>>>> > >>>>>>>>> given > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> state store name. This approach has one awkwardness though, > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> serdes > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> state store names param are not separated and could be > >>>>>>>>>>>>>>>> overlapped > >>>>>>>>>>>>>>>> (see > >>>>>>>>>>>>>>>> detailed comment #2 below). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2. This step does not need to be included in this KIP, but > >>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> as a > >>>>>>>> > >>>>>>>>> reference / future work: as we have discussed before, we may > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> enforce > >>>>>>>> > >>>>>>>>> materialize KTable.join resulted KTables as well in the > >>>>>>>>>>>>>>>> future. If > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> that, then: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a) KXX.agg resulted KTables are always materialized; > >>>>>>>>>>>>>>>> b) KTable.agg requires the aggregating KTable to always be > >>>>>>>>>>>>>>>> materialized > >>>>>>>>>>>>>>>> (otherwise we would not know the old value); > >>>>>>>>>>>>>>>> c) KTable.join resulted KTables are always materialized, > and > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> are > >>>>>>>> > >>>>>>>>> the > >>>>>>>>>>>>>>>> joining KTables to always be materialized. > >>>>>>>>>>>>>>>> d) KTable.filter/mapValues resulted KTables > materialization > >>>>>>>>>>>>>>>> depend > >>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> parent's materialization; > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> By recursive induction all KTables are actually always > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> materialized, > >>>>>>>> > >>>>>>>>> and > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> then the effect of the "materialize()" is just for > specifying > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>> store names. In this scenario, we do not need to send > >>>>>>>>>>>>>>>> Change<V> in > >>>>>>>>>>>>>>>> repartition topics within joins any more, but only for > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> repartitions > >>>>>>>> > >>>>>>>>> topics > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> within aggregations. Instead, we can just send a > "tombstone" > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> without > >>>>>>>> > >>>>>>>>> the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> old value and we do not need to calculate joins twice (one > more > >>>>>>>>>>>>>> time > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> old value is received). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. I'm wondering if it is worth-while to add a > >>>>>>>>>>>>>>>> "KStream#toTable()" > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> which is interpreted as a dummy-aggregation where the new > >>>>>>>>>>>>>>>> value > >>>>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>> replaces the old value. I have seen a couple of use cases > of > >>>>>>>>>>>>>>>> this, > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> example, users want to read a changelog topic, apply some > >>>>>>>>>>>>>>>> filters, > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> materialize it into a KTable with state stores without > >>>>>>>>>>>>>>>> creating > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> duplicated > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> changelog topics. With materialize() and toTable I'd > imagine > >>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>> specify sth. like: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> " > >>>>>>>>>>>>>>>> KStream stream = builder.stream("topic1").filter(..); > >>>>>>>>>>>>>>>> KTable table = stream.toTable(..); > >>>>>>>>>>>>>>>> table.materialize("state1"); > >>>>>>>>>>>>>>>> " > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> And the library in this case could set store "state1" 's > >>>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> topic > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> be "topic1", and applying the filter on the fly while > >>>>>>>>>>>>>>>> (re-)storing > >>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>> state by reading from this topic, instead of creating a > >>>>>>>>>>>>>>>> second > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> topic like "appID-state1-changelog" which is a > semi-duplicate > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> "topic1". > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Detailed: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I > was > >>>>>>>>>>>>>>>> thinking > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> renaming to "#toChangeLog" but after thinking a bit more I > >>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> #toStream > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> is still better, and we can just mention in the javaDoc > that > >>>>>>>>>>>>>>>> it is > >>>>>>>>>>>>>>>> transforming its underlying changelog stream to a normal > >>>>>>>>>>>>>>>> stream. > >>>>>>>>>>>>>>>> 2. As Damian mentioned, there are a few scenarios where > the > >>>>>>>>>>>>>>>> serdes > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> already specified in a previous operation whereas it is > not > >>>>>>>>>>>>>>>> known > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> before > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> calling materialize, for example: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> table.mapValues(/*no > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> serde specified*/).materialize(serde). We need to specify > >>>>>>>>>>>>>> what are > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> handling logic here. > >>>>>>>>>>>>>>>> 3. We can remove "KTable#to" call as well, and enforce > users > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> call " > >>>>>>>>>>>>>>>> KTable.toStream.to" to be more clear. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska < > >>>>>>>>>>>>>>>> eno.there...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think changing it to `toKStream` would make it > absolutely > >>>>>>>>>>>>>>>> clear > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> are converting it to. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'd say we should probably change the KStreamBuilder > methods > >>>>>>>>>>>>>>>>> (but > >>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> this KIP). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll < > >>>>>>>>>>>>>>>>> mich...@confluent.io> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Rename toStream() to toKStream() for consistency. > >>>>>>>>>>>>>>>>>> Not sure whether that is really required. We also use > >>>>>>>>>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, > for > >>>>>>>>>>>>>>>>>> example, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> don't care about the "K" prefix. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> eno.there...@gmail.com > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks Damian, answers inline: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy < > >>>>>>>>>>>>>>>>>>> damian....@gmail.com> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Eno, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP. Some comments: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. I'd probably rename materialized to materialize. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Ok. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2. I don't think the addition of the new Log compaction > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> mechanism > >>>>>>>> > >>>>>>>>> is > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> necessary for this KIP, i.e, the KIP is useful without > it. > >>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> should be a different KIP? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for that. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 3. What will happen when you call materialize on KTable > >>>>>>>>>>>>>>>>>>> that is > >>>>>>>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> materialized? Will it create another StateStore > (providing > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> different), throw an Exception? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Currently an exception is thrown, but see below. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 4. Have you considered overloading the existing KTable > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> operations > >>>>>>>> > >>>>>>>>> to > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> add > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a state store name? So if a state store name is provided, > >>>>>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> materialize > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> a state store? This would be my preferred approach as i > >>>>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> materialize is always a valid operation. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Ok I can see your point. This will increase the KIP size > >>>>>>>>>>>>>>>>>>> since > >>>>>>>>>>>>>>>>>>> I'll > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> to enumerate all overloaded methods, but it's not a > problem. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 5. The materialize method will need ta value Serde as > some > >>>>>>>>>>>>>>>>>>> operations, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> i.e., mapValues, join etc can change the value types > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - > might > >>>>>>>>>>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> always need to materialize the StateStore for > KTable-KTable > >>>>>>>>>>>>>>>>>>>> joins. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> is the case, then the KTable Join operators will also > need > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Serde > >>>>>>>> > >>>>>>>>> information. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'll update the KIP with the serdes. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Damian > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska < > >>>>>>>>>>>>>>>>>>>> eno.there...@gmail.com> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> We created "KIP-114: KTable materialization and > improved > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> semantics" > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> solidify the KTable semantics in Kafka Streams: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics > >>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics > >>>>>>>>>>>>>>>>>>>> Your feedback is appreciated. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >> > > > > > > > >