Like Damian, and for the same reasons, I am more in favor of overloading methods rather than introducing `materialize()`. FWIW, we already have a similar API setup for e.g. `KTable#through(topicName, stateStoreName)`.
A related but slightly different question is what e.g. Jan Filipiak mentioned earlier in this thread: I think we need to explain more clearly why KIP-114 doesn't propose the seemingly simpler solution of always materializing tables/state stores. On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi, > > Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the > ValueGetter of Filter it will apply the filter and should be completely > transparent as to if another processor or IQ is accessing it? How can this > new method help? > > I cannot see the reason for the additional materialize method being > required! Hence I suggest leave it alone. > regarding removing the others I dont have strong opinions and it seems to > be unrelated. > > Best Jan > > > > > On 26.01.2017 20:48, Eno Thereska wrote: > >> Forwarding this thread to the users list too in case people would like to >> comment. It is also on the dev list. >> >> Thanks >> Eno >> >> Begin forwarded message: >>> >>> From: "Matthias J. Sax" <matth...@confluent.io> >>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved >>> semantics >>> Date: 24 January 2017 at 19:30:10 GMT >>> To: dev@kafka.apache.org >>> Reply-To: dev@kafka.apache.org >>> >>> That not what I meant by "huge impact". >>> >>> I refer to the actions related to materialize a KTable: creating a >>> RocksDB store and a changelog topic -- users should be aware about >>> runtime implication and this is better expressed by an explicit method >>> call, rather than implicitly triggered by using a different overload of >>> a method. >>> >>> >>> -Matthias >>> >>> On 1/24/17 1:35 AM, Damian Guy wrote: >>> >>>> I think your definition of a huge impact and mine are rather different >>>> ;-P >>>> Overloading a few methods is not really a huge impact IMO. It is also a >>>> sacrifice worth making for readability, usability of the API. >>>> >>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>> I understand your argument, but do not agree with it. >>>>> >>>>> Your first version (even if the "flow" is not as nice) is more explicit >>>>> than the second version. Adding a stateStoreName parameter is quite >>>>> implicit but has a huge impact -- thus, I prefer the rather more >>>>> verbose >>>>> but explicit version. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>>> >>>>>> I'm not a fan of materialize. I think it interrupts the flow, i.e, >>>>>> >>>>>> table.mapValue(..).materialize().join(..).materialize() >>>>>> compared to: >>>>>> table.mapValues(..).join(..) >>>>>> >>>>>> I know which one i prefer. >>>>>> My preference is stil to provide overloaded methods where people can >>>>>> specify the store names if they want, otherwise we just generate them. >>>>>> >>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io> >>>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>>> >>>>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>>>> >>>>>>> 1) I like Guozhang's proposal about removing store name from all >>>>>>> KTable >>>>>>> methods and generate internal names (however, I would do this as >>>>>>> overloads). Furthermore, I would not force users to call >>>>>>> .materialize() >>>>>>> if they want to query a store, but add one more method >>>>>>> .stateStoreName() >>>>>>> that returns the store name if the KTable is materialized. Thus, also >>>>>>> .materialize() must not necessarily have a parameter storeName (ie, >>>>>>> we >>>>>>> should have some overloads here). >>>>>>> >>>>>>> I would also not allow to provide a null store name (to indicate no >>>>>>> materialization if not necessary) but throw an exception. >>>>>>> >>>>>>> This yields some simplification (see below). >>>>>>> >>>>>>> >>>>>>> 2) I also like Guozhang's proposal about KStream#toTable() >>>>>>> >>>>>>> >>>>>>> 3) >>>>>>> >>>>>>>> 3. What will happen when you call materialize on KTable that is >>>>>>>>> >>>>>>>> already >>>>>>> >>>>>>>> materialized? Will it create another StateStore (providing the >>>>>>>>> name >>>>>>>>> >>>>>>>> is >>>>> >>>>>> different), throw an Exception? >>>>>>>>> >>>>>>>> Currently an exception is thrown, but see below. >>>>>>>> >>>>>>>> >>>>>>>> If we follow approach (1) from Guozhang, there is no need to worry >>>>>>> about >>>>>>> a second materialization and also no exception must be throws. A >>>>>>> call to >>>>>>> .materialize() basically sets a "materialized flag" (ie, idempotent >>>>>>> operation) and sets a new name. >>>>>>> >>>>>>> >>>>>>> 4) >>>>>>> >>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>> >>>>>>>> Not sure whether that is really required. We also use >>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, >>>>>>>> >>>>>>> and >>>>> >>>>>> don't care about the "K" prefix. >>>>>>>> >>>>>>> Eno's reply: >>>>>>> >>>>>>>> I think changing it to `toKStream` would make it absolutely clear >>>>>>>> what >>>>>>>> >>>>>>> we are converting it to. >>>>>>> >>>>>>>> I'd say we should probably change the KStreamBuilder methods (but >>>>>>>> not >>>>>>>> >>>>>>> in >>>>> >>>>>> this KIP). >>>>>>> >>>>>>> I would keep #toStream(). (see below) >>>>>>> >>>>>>> >>>>>>> 5) We should not remove any methods but only deprecate them. >>>>>>> >>>>>>> >>>>>>> >>>>>>> A general note: >>>>>>> >>>>>>> I do not understand your comments "Rejected Alternatives". You say >>>>>>> "Have >>>>>>> the KTable be the materialized view" was rejected. But your KIP >>>>>>> actually >>>>>>> does exactly this -- the changelog abstraction of KTable is secondary >>>>>>> after those changes and the "view" abstraction is what a KTable is. >>>>>>> And >>>>>>> just to be clear, I like this a lot: >>>>>>> >>>>>>> - it aligns with the name KTable >>>>>>> - is aligns with stream-table-duality >>>>>>> - it aligns with IQ >>>>>>> >>>>>>> I would say that a KTable is a "view abstraction" (as >>>>>>> materialization is >>>>>>> optional). >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>>> >>>>>>>> Thanks for the KIP Eno, I have a few meta comments and a few >>>>>>>> detailed >>>>>>>> comments: >>>>>>>> >>>>>>>> 1. I like the materialize() function in general, but I would like to >>>>>>>> >>>>>>> see >>>>> >>>>>> how other KTable functions should be updated accordingly. For example, >>>>>>>> >>>>>>> 1) >>>>> >>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will >>>>>>>> always materialize the KTable unless its state store name is set to >>>>>>>> >>>>>>> null; >>>>> >>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence >>>>>>>> >>>>>>> it >>>>> >>>>>> also have a state store name; 3) KTable.join requires the joining >>>>>>>> table >>>>>>>> >>>>>>> to >>>>>>> >>>>>>>> be materialized. And today we do not actually have a mechanism to >>>>>>>> >>>>>>> enforce >>>>> >>>>>> that, but will only throw an exception at runtime if it is not (e.g. >>>>>>>> if >>>>>>>> >>>>>>> you >>>>>>> >>>>>>>> have "builder.table("topic", null).join()" a RTE will be thrown). >>>>>>>> >>>>>>>> I'd make an extended proposal just to kick off the discussion here: >>>>>>>> >>>>>>> let's >>>>> >>>>>> remove all the state store params in other KTable functions, and if in >>>>>>>> >>>>>>> some >>>>>>> >>>>>>>> cases KTable have to be materialized (e.g. KTable resulted from >>>>>>>> >>>>>>> KXX.agg) >>>>> >>>>>> and users do not call materialize(), then we treat it as "users are >>>>>>>> not >>>>>>>> interested in querying it at all" and hence use an internal name >>>>>>>> >>>>>>> generated >>>>>>> >>>>>>>> for the materialized KTable; i.e. although it is materialized the >>>>>>>> state >>>>>>>> store is not exposed to users. And if users call materialize() >>>>>>>> >>>>>>> afterwards >>>>> >>>>>> but we have already decided to materialize it, we can replace the >>>>>>>> >>>>>>> internal >>>>>>> >>>>>>>> name with the user's provided names. Then from a user's point-view, >>>>>>>> if >>>>>>>> >>>>>>> they >>>>>>> >>>>>>>> ever want to query a KTable, they have to call materialize() with a >>>>>>>> >>>>>>> given >>>>> >>>>>> state store name. This approach has one awkwardness though, that >>>>>>>> serdes >>>>>>>> >>>>>>> and >>>>>>> >>>>>>>> state store names param are not separated and could be overlapped >>>>>>>> (see >>>>>>>> detailed comment #2 below). >>>>>>>> >>>>>>>> >>>>>>>> 2. This step does not need to be included in this KIP, but just as a >>>>>>>> reference / future work: as we have discussed before, we may enforce >>>>>>>> materialize KTable.join resulted KTables as well in the future. If >>>>>>>> we >>>>>>>> >>>>>>> do >>>>> >>>>>> that, then: >>>>>>>> >>>>>>>> a) KXX.agg resulted KTables are always materialized; >>>>>>>> b) KTable.agg requires the aggregating KTable to always be >>>>>>>> materialized >>>>>>>> (otherwise we would not know the old value); >>>>>>>> c) KTable.join resulted KTables are always materialized, and so are >>>>>>>> the >>>>>>>> joining KTables to always be materialized. >>>>>>>> d) KTable.filter/mapValues resulted KTables materialization depend >>>>>>>> on >>>>>>>> >>>>>>> its >>>>> >>>>>> parent's materialization; >>>>>>>> >>>>>>>> By recursive induction all KTables are actually always materialized, >>>>>>>> >>>>>>> and >>>>> >>>>>> then the effect of the "materialize()" is just for specifying the >>>>>>>> state >>>>>>>> store names. In this scenario, we do not need to send Change<V> in >>>>>>>> repartition topics within joins any more, but only for repartitions >>>>>>>> >>>>>>> topics >>>>>>> >>>>>>>> within aggregations. Instead, we can just send a "tombstone" without >>>>>>>> >>>>>>> the >>>>> >>>>>> old value and we do not need to calculate joins twice (one more time >>>>>>>> >>>>>>> when >>>>> >>>>>> old value is received). >>>>>>>> >>>>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()" >>>>>>>> >>>>>>> function >>>>>>> >>>>>>>> which is interpreted as a dummy-aggregation where the new value >>>>>>>> always >>>>>>>> replaces the old value. I have seen a couple of use cases of this, >>>>>>>> for >>>>>>>> example, users want to read a changelog topic, apply some filters, >>>>>>>> and >>>>>>>> >>>>>>> then >>>>>>> >>>>>>>> materialize it into a KTable with state stores without creating >>>>>>>> >>>>>>> duplicated >>>>>>> >>>>>>>> changelog topics. With materialize() and toTable I'd imagine users >>>>>>>> can >>>>>>>> specify sth. like: >>>>>>>> >>>>>>>> " >>>>>>>> KStream stream = builder.stream("topic1").filter(..); >>>>>>>> KTable table = stream.toTable(..); >>>>>>>> table.materialize("state1"); >>>>>>>> " >>>>>>>> >>>>>>>> And the library in this case could set store "state1" 's changelog >>>>>>>> >>>>>>> topic >>>>> >>>>>> to >>>>>>> >>>>>>>> be "topic1", and applying the filter on the fly while (re-)storing >>>>>>>> its >>>>>>>> state by reading from this topic, instead of creating a second >>>>>>>> >>>>>>> changelog >>>>> >>>>>> topic like "appID-state1-changelog" which is a semi-duplicate of >>>>>>>> >>>>>>> "topic1". >>>>>>> >>>>>>>> >>>>>>>> Detailed: >>>>>>>> >>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was >>>>>>>> thinking >>>>>>>> >>>>>>> about >>>>>>> >>>>>>>> renaming to "#toChangeLog" but after thinking a bit more I think >>>>>>>> >>>>>>> #toStream >>>>>>> >>>>>>>> is still better, and we can just mention in the javaDoc that it is >>>>>>>> transforming its underlying changelog stream to a normal stream. >>>>>>>> 2. As Damian mentioned, there are a few scenarios where the serdes >>>>>>>> are >>>>>>>> already specified in a previous operation whereas it is not known >>>>>>>> >>>>>>> before >>>>> >>>>>> calling materialize, for example: >>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >>>>>>>> >>>>>>> table.mapValues(/*no >>>>> >>>>>> serde specified*/).materialize(serde). We need to specify what are >>>>>>>> the >>>>>>>> handling logic here. >>>>>>>> 3. We can remove "KTable#to" call as well, and enforce users to >>>>>>>> call " >>>>>>>> KTable.toStream.to" to be more clear. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska < >>>>>>>> eno.there...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> I think changing it to `toKStream` would make it absolutely clear >>>>>>>>> what >>>>>>>>> >>>>>>>> we >>>>>>> >>>>>>>> are converting it to. >>>>>>>>> >>>>>>>>> I'd say we should probably change the KStreamBuilder methods (but >>>>>>>>> not >>>>>>>>> >>>>>>>> in >>>>> >>>>>> this KIP). >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Eno >>>>>>>>> >>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>>>>> >>>>>>>>>> Not sure whether that is really required. We also use >>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for >>>>>>>>>> example, >>>>>>>>>> >>>>>>>>> and >>>>>>> >>>>>>>> don't care about the "K" prefix. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >>>>>>>>>> >>>>>>>>> eno.there...@gmail.com >>>>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Thanks Damian, answers inline: >>>>>>>>>>> >>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Eno, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>>>>> >>>>>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>>>>>> >>>>>>>>>>> Ok. >>>>>>>>>>> >>>>>>>>>>> 2. I don't think the addition of the new Log compaction mechanism >>>>>>>>>>>> >>>>>>>>>>> is >>>>> >>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe >>>>>>>>>>>> >>>>>>>>>>> that >>>>>>> >>>>>>>> should be a different KIP? >>>>>>>>>>>> >>>>>>>>>>> Agreed, already removed. Will do a separate KIP for that. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 3. What will happen when you call materialize on KTable that is >>>>>>>>>>>> >>>>>>>>>>> already >>>>>>>>> >>>>>>>>>> materialized? Will it create another StateStore (providing the >>>>>>>>>>>> >>>>>>>>>>> name >>>>> >>>>>> is >>>>>>> >>>>>>>> different), throw an Exception? >>>>>>>>>>>> >>>>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 4. Have you considered overloading the existing KTable operations >>>>>>>>>>>> >>>>>>>>>>> to >>>>> >>>>>> add >>>>>>>>>>> >>>>>>>>>>>> a state store name? So if a state store name is provided, then >>>>>>>>>>>> >>>>>>>>>>> materialize >>>>>>>>>>> >>>>>>>>>>>> a state store? This would be my preferred approach as i don't >>>>>>>>>>>> >>>>>>>>>>> think >>>>> >>>>>> materialize is always a valid operation. >>>>>>>>>>>> >>>>>>>>>>> Ok I can see your point. This will increase the KIP size since >>>>>>>>>>> I'll >>>>>>>>>>> >>>>>>>>>> need >>>>>>> >>>>>>>> to enumerate all overloaded methods, but it's not a problem. >>>>>>>>>>> >>>>>>>>>>> 5. The materialize method will need ta value Serde as some >>>>>>>>>>>> >>>>>>>>>>> operations, >>>>>>> >>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might >>>>>>>>>>>> mean >>>>>>>>>>>> >>>>>>>>>>> that >>>>>>> >>>>>>>> we >>>>>>>>>>> >>>>>>>>>>>> always need to materialize the StateStore for KTable-KTable >>>>>>>>>>>> joins. >>>>>>>>>>>> >>>>>>>>>>> If >>>>>>> >>>>>>>> that >>>>>>>>>>> >>>>>>>>>>>> is the case, then the KTable Join operators will also need Serde >>>>>>>>>>>> information. >>>>>>>>>>>> >>>>>>>>>>> I'll update the KIP with the serdes. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Eno >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>>> Damian >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska < >>>>>>>>>>>> eno.there...@gmail.com> >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello, >>>>>>>>>>>>> >>>>>>>>>>>>> We created "KIP-114: KTable materialization and improved >>>>>>>>>>>>> >>>>>>>>>>>> semantics" >>>>> >>>>>> to >>>>>>> >>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>>>>> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>> >>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>>> >>>>>>>>>>>> < >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>> >>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>> >>>>>>>>>>>> Your feedback is appreciated. >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Eno >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >> >