Forwarding this thread to the users list too in case people would like to comment. It is also on the dev list.
Thanks Eno > Begin forwarded message: > > From: "Matthias J. Sax" <matth...@confluent.io> > Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved semantics > Date: 24 January 2017 at 19:30:10 GMT > To: d...@kafka.apache.org > Reply-To: d...@kafka.apache.org > > That not what I meant by "huge impact". > > I refer to the actions related to materialize a KTable: creating a > RocksDB store and a changelog topic -- users should be aware about > runtime implication and this is better expressed by an explicit method > call, rather than implicitly triggered by using a different overload of > a method. > > > -Matthias > > On 1/24/17 1:35 AM, Damian Guy wrote: >> I think your definition of a huge impact and mine are rather different ;-P >> Overloading a few methods is not really a huge impact IMO. It is also a >> sacrifice worth making for readability, usability of the API. >> >> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io> wrote: >> >>> I understand your argument, but do not agree with it. >>> >>> Your first version (even if the "flow" is not as nice) is more explicit >>> than the second version. Adding a stateStoreName parameter is quite >>> implicit but has a huge impact -- thus, I prefer the rather more verbose >>> but explicit version. >>> >>> >>> -Matthias >>> >>> On 1/23/17 1:39 AM, Damian Guy wrote: >>>> I'm not a fan of materialize. I think it interrupts the flow, i.e, >>>> >>>> table.mapValue(..).materialize().join(..).materialize() >>>> compared to: >>>> table.mapValues(..).join(..) >>>> >>>> I know which one i prefer. >>>> My preference is stil to provide overloaded methods where people can >>>> specify the store names if they want, otherwise we just generate them. >>>> >>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>> >>>>> Hi, >>>>> >>>>> thanks for the KIP Eno! Here are my 2 cents: >>>>> >>>>> 1) I like Guozhang's proposal about removing store name from all KTable >>>>> methods and generate internal names (however, I would do this as >>>>> overloads). Furthermore, I would not force users to call .materialize() >>>>> if they want to query a store, but add one more method .stateStoreName() >>>>> that returns the store name if the KTable is materialized. Thus, also >>>>> .materialize() must not necessarily have a parameter storeName (ie, we >>>>> should have some overloads here). >>>>> >>>>> I would also not allow to provide a null store name (to indicate no >>>>> materialization if not necessary) but throw an exception. >>>>> >>>>> This yields some simplification (see below). >>>>> >>>>> >>>>> 2) I also like Guozhang's proposal about KStream#toTable() >>>>> >>>>> >>>>> 3) >>>>>> >>>>>>> 3. What will happen when you call materialize on KTable that is >>>>> already >>>>>>> materialized? Will it create another StateStore (providing the name >>> is >>>>>>> different), throw an Exception? >>>>>> >>>>>> Currently an exception is thrown, but see below. >>>>>> >>>>>> >>>>> >>>>> If we follow approach (1) from Guozhang, there is no need to worry about >>>>> a second materialization and also no exception must be throws. A call to >>>>> .materialize() basically sets a "materialized flag" (ie, idempotent >>>>> operation) and sets a new name. >>>>> >>>>> >>>>> 4) >>>>>>> Rename toStream() to toKStream() for consistency. >>>>>> >>>>>> Not sure whether that is really required. We also use >>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, >>> and >>>>>> don't care about the "K" prefix. >>>>> >>>>> Eno's reply: >>>>>> I think changing it to `toKStream` would make it absolutely clear what >>>>> we are converting it to. >>>>>> >>>>>> I'd say we should probably change the KStreamBuilder methods (but not >>> in >>>>> this KIP). >>>>> >>>>> I would keep #toStream(). (see below) >>>>> >>>>> >>>>> 5) We should not remove any methods but only deprecate them. >>>>> >>>>> >>>>> >>>>> A general note: >>>>> >>>>> I do not understand your comments "Rejected Alternatives". You say "Have >>>>> the KTable be the materialized view" was rejected. But your KIP actually >>>>> does exactly this -- the changelog abstraction of KTable is secondary >>>>> after those changes and the "view" abstraction is what a KTable is. And >>>>> just to be clear, I like this a lot: >>>>> >>>>> - it aligns with the name KTable >>>>> - is aligns with stream-table-duality >>>>> - it aligns with IQ >>>>> >>>>> I would say that a KTable is a "view abstraction" (as materialization is >>>>> optional). >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> >>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed >>>>>> comments: >>>>>> >>>>>> 1. I like the materialize() function in general, but I would like to >>> see >>>>>> how other KTable functions should be updated accordingly. For example, >>> 1) >>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will >>>>>> always materialize the KTable unless its state store name is set to >>> null; >>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence >>> it >>>>>> also have a state store name; 3) KTable.join requires the joining table >>>>> to >>>>>> be materialized. And today we do not actually have a mechanism to >>> enforce >>>>>> that, but will only throw an exception at runtime if it is not (e.g. if >>>>> you >>>>>> have "builder.table("topic", null).join()" a RTE will be thrown). >>>>>> >>>>>> I'd make an extended proposal just to kick off the discussion here: >>> let's >>>>>> remove all the state store params in other KTable functions, and if in >>>>> some >>>>>> cases KTable have to be materialized (e.g. KTable resulted from >>> KXX.agg) >>>>>> and users do not call materialize(), then we treat it as "users are not >>>>>> interested in querying it at all" and hence use an internal name >>>>> generated >>>>>> for the materialized KTable; i.e. although it is materialized the state >>>>>> store is not exposed to users. And if users call materialize() >>> afterwards >>>>>> but we have already decided to materialize it, we can replace the >>>>> internal >>>>>> name with the user's provided names. Then from a user's point-view, if >>>>> they >>>>>> ever want to query a KTable, they have to call materialize() with a >>> given >>>>>> state store name. This approach has one awkwardness though, that serdes >>>>> and >>>>>> state store names param are not separated and could be overlapped (see >>>>>> detailed comment #2 below). >>>>>> >>>>>> >>>>>> 2. This step does not need to be included in this KIP, but just as a >>>>>> reference / future work: as we have discussed before, we may enforce >>>>>> materialize KTable.join resulted KTables as well in the future. If we >>> do >>>>>> that, then: >>>>>> >>>>>> a) KXX.agg resulted KTables are always materialized; >>>>>> b) KTable.agg requires the aggregating KTable to always be materialized >>>>>> (otherwise we would not know the old value); >>>>>> c) KTable.join resulted KTables are always materialized, and so are the >>>>>> joining KTables to always be materialized. >>>>>> d) KTable.filter/mapValues resulted KTables materialization depend on >>> its >>>>>> parent's materialization; >>>>>> >>>>>> By recursive induction all KTables are actually always materialized, >>> and >>>>>> then the effect of the "materialize()" is just for specifying the state >>>>>> store names. In this scenario, we do not need to send Change<V> in >>>>>> repartition topics within joins any more, but only for repartitions >>>>> topics >>>>>> within aggregations. Instead, we can just send a "tombstone" without >>> the >>>>>> old value and we do not need to calculate joins twice (one more time >>> when >>>>>> old value is received). >>>>>> >>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()" >>>>> function >>>>>> which is interpreted as a dummy-aggregation where the new value always >>>>>> replaces the old value. I have seen a couple of use cases of this, for >>>>>> example, users want to read a changelog topic, apply some filters, and >>>>> then >>>>>> materialize it into a KTable with state stores without creating >>>>> duplicated >>>>>> changelog topics. With materialize() and toTable I'd imagine users can >>>>>> specify sth. like: >>>>>> >>>>>> " >>>>>> KStream stream = builder.stream("topic1").filter(..); >>>>>> KTable table = stream.toTable(..); >>>>>> table.materialize("state1"); >>>>>> " >>>>>> >>>>>> And the library in this case could set store "state1" 's changelog >>> topic >>>>> to >>>>>> be "topic1", and applying the filter on the fly while (re-)storing its >>>>>> state by reading from this topic, instead of creating a second >>> changelog >>>>>> topic like "appID-state1-changelog" which is a semi-duplicate of >>>>> "topic1". >>>>>> >>>>>> >>>>>> Detailed: >>>>>> >>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking >>>>> about >>>>>> renaming to "#toChangeLog" but after thinking a bit more I think >>>>> #toStream >>>>>> is still better, and we can just mention in the javaDoc that it is >>>>>> transforming its underlying changelog stream to a normal stream. >>>>>> 2. As Damian mentioned, there are a few scenarios where the serdes are >>>>>> already specified in a previous operation whereas it is not known >>> before >>>>>> calling materialize, for example: >>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >>> table.mapValues(/*no >>>>>> serde specified*/).materialize(serde). We need to specify what are the >>>>>> handling logic here. >>>>>> 3. We can remove "KTable#to" call as well, and enforce users to call " >>>>>> KTable.toStream.to" to be more clear. >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.there...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I think changing it to `toKStream` would make it absolutely clear what >>>>> we >>>>>>> are converting it to. >>>>>>> >>>>>>> I'd say we should probably change the KStreamBuilder methods (but not >>> in >>>>>>> this KIP). >>>>>>> >>>>>>> Thanks >>>>>>> Eno >>>>>>> >>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> wrote: >>>>>>>> >>>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>>> >>>>>>>> Not sure whether that is really required. We also use >>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, >>>>> and >>>>>>>> don't care about the "K" prefix. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >>> eno.there...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Damian, answers inline: >>>>>>>>> >>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Hi Eno, >>>>>>>>>> >>>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>>> >>>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>>> >>>>>>>>> Ok. >>>>>>>>> >>>>>>>>>> 2. I don't think the addition of the new Log compaction mechanism >>> is >>>>>>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe >>>>> that >>>>>>>>>> should be a different KIP? >>>>>>>>> >>>>>>>>> Agreed, already removed. Will do a separate KIP for that. >>>>>>>>> >>>>>>>>> >>>>>>>>>> 3. What will happen when you call materialize on KTable that is >>>>>>> already >>>>>>>>>> materialized? Will it create another StateStore (providing the >>> name >>>>> is >>>>>>>>>> different), throw an Exception? >>>>>>>>> >>>>>>>>> Currently an exception is thrown, but see below. >>>>>>>>> >>>>>>>>> >>>>>>>>>> 4. Have you considered overloading the existing KTable operations >>> to >>>>>>>>> add >>>>>>>>>> a state store name? So if a state store name is provided, then >>>>>>>>> materialize >>>>>>>>>> a state store? This would be my preferred approach as i don't >>> think >>>>>>>>>> materialize is always a valid operation. >>>>>>>>> >>>>>>>>> Ok I can see your point. This will increase the KIP size since I'll >>>>> need >>>>>>>>> to enumerate all overloaded methods, but it's not a problem. >>>>>>>>> >>>>>>>>>> 5. The materialize method will need ta value Serde as some >>>>> operations, >>>>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean >>>>> that >>>>>>>>> we >>>>>>>>>> always need to materialize the StateStore for KTable-KTable joins. >>>>> If >>>>>>>>> that >>>>>>>>>> is the case, then the KTable Join operators will also need Serde >>>>>>>>>> information. >>>>>>>>> >>>>>>>>> I'll update the KIP with the serdes. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Eno >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.there...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> We created "KIP-114: KTable materialization and improved >>> semantics" >>>>> to >>>>>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>>> < >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Your feedback is appreciated. >>>>>>>>>>> Thanks >>>>>>>>>>> Eno >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >