That not what I meant by "huge impact". I refer to the actions related to materialize a KTable: creating a RocksDB store and a changelog topic -- users should be aware about runtime implication and this is better expressed by an explicit method call, rather than implicitly triggered by using a different overload of a method.
-Matthias On 1/24/17 1:35 AM, Damian Guy wrote: > I think your definition of a huge impact and mine are rather different ;-P > Overloading a few methods is not really a huge impact IMO. It is also a > sacrifice worth making for readability, usability of the API. > > On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io> wrote: > >> I understand your argument, but do not agree with it. >> >> Your first version (even if the "flow" is not as nice) is more explicit >> than the second version. Adding a stateStoreName parameter is quite >> implicit but has a huge impact -- thus, I prefer the rather more verbose >> but explicit version. >> >> >> -Matthias >> >> On 1/23/17 1:39 AM, Damian Guy wrote: >>> I'm not a fan of materialize. I think it interrupts the flow, i.e, >>> >>> table.mapValue(..).materialize().join(..).materialize() >>> compared to: >>> table.mapValues(..).join(..) >>> >>> I know which one i prefer. >>> My preference is stil to provide overloaded methods where people can >>> specify the store names if they want, otherwise we just generate them. >>> >>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io> >> wrote: >>> >>>> Hi, >>>> >>>> thanks for the KIP Eno! Here are my 2 cents: >>>> >>>> 1) I like Guozhang's proposal about removing store name from all KTable >>>> methods and generate internal names (however, I would do this as >>>> overloads). Furthermore, I would not force users to call .materialize() >>>> if they want to query a store, but add one more method .stateStoreName() >>>> that returns the store name if the KTable is materialized. Thus, also >>>> .materialize() must not necessarily have a parameter storeName (ie, we >>>> should have some overloads here). >>>> >>>> I would also not allow to provide a null store name (to indicate no >>>> materialization if not necessary) but throw an exception. >>>> >>>> This yields some simplification (see below). >>>> >>>> >>>> 2) I also like Guozhang's proposal about KStream#toTable() >>>> >>>> >>>> 3) >>>>> >>>>>> 3. What will happen when you call materialize on KTable that is >>>> already >>>>>> materialized? Will it create another StateStore (providing the name >> is >>>>>> different), throw an Exception? >>>>> >>>>> Currently an exception is thrown, but see below. >>>>> >>>>> >>>> >>>> If we follow approach (1) from Guozhang, there is no need to worry about >>>> a second materialization and also no exception must be throws. A call to >>>> .materialize() basically sets a "materialized flag" (ie, idempotent >>>> operation) and sets a new name. >>>> >>>> >>>> 4) >>>>>> Rename toStream() to toKStream() for consistency. >>>>> >>>>> Not sure whether that is really required. We also use >>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, >> and >>>>> don't care about the "K" prefix. >>>> >>>> Eno's reply: >>>>> I think changing it to `toKStream` would make it absolutely clear what >>>> we are converting it to. >>>>> >>>>> I'd say we should probably change the KStreamBuilder methods (but not >> in >>>> this KIP). >>>> >>>> I would keep #toStream(). (see below) >>>> >>>> >>>> 5) We should not remove any methods but only deprecate them. >>>> >>>> >>>> >>>> A general note: >>>> >>>> I do not understand your comments "Rejected Alternatives". You say "Have >>>> the KTable be the materialized view" was rejected. But your KIP actually >>>> does exactly this -- the changelog abstraction of KTable is secondary >>>> after those changes and the "view" abstraction is what a KTable is. And >>>> just to be clear, I like this a lot: >>>> >>>> - it aligns with the name KTable >>>> - is aligns with stream-table-duality >>>> - it aligns with IQ >>>> >>>> I would say that a KTable is a "view abstraction" (as materialization is >>>> optional). >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> >>>> On 1/22/17 5:05 PM, Guozhang Wang wrote: >>>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed >>>>> comments: >>>>> >>>>> 1. I like the materialize() function in general, but I would like to >> see >>>>> how other KTable functions should be updated accordingly. For example, >> 1) >>>>> KStreamBuilder.table(..) has a state store name parameter, and we will >>>>> always materialize the KTable unless its state store name is set to >> null; >>>>> 2) KTable.agg requires the result KTable to be materialized, and hence >> it >>>>> also have a state store name; 3) KTable.join requires the joining table >>>> to >>>>> be materialized. And today we do not actually have a mechanism to >> enforce >>>>> that, but will only throw an exception at runtime if it is not (e.g. if >>>> you >>>>> have "builder.table("topic", null).join()" a RTE will be thrown). >>>>> >>>>> I'd make an extended proposal just to kick off the discussion here: >> let's >>>>> remove all the state store params in other KTable functions, and if in >>>> some >>>>> cases KTable have to be materialized (e.g. KTable resulted from >> KXX.agg) >>>>> and users do not call materialize(), then we treat it as "users are not >>>>> interested in querying it at all" and hence use an internal name >>>> generated >>>>> for the materialized KTable; i.e. although it is materialized the state >>>>> store is not exposed to users. And if users call materialize() >> afterwards >>>>> but we have already decided to materialize it, we can replace the >>>> internal >>>>> name with the user's provided names. Then from a user's point-view, if >>>> they >>>>> ever want to query a KTable, they have to call materialize() with a >> given >>>>> state store name. This approach has one awkwardness though, that serdes >>>> and >>>>> state store names param are not separated and could be overlapped (see >>>>> detailed comment #2 below). >>>>> >>>>> >>>>> 2. This step does not need to be included in this KIP, but just as a >>>>> reference / future work: as we have discussed before, we may enforce >>>>> materialize KTable.join resulted KTables as well in the future. If we >> do >>>>> that, then: >>>>> >>>>> a) KXX.agg resulted KTables are always materialized; >>>>> b) KTable.agg requires the aggregating KTable to always be materialized >>>>> (otherwise we would not know the old value); >>>>> c) KTable.join resulted KTables are always materialized, and so are the >>>>> joining KTables to always be materialized. >>>>> d) KTable.filter/mapValues resulted KTables materialization depend on >> its >>>>> parent's materialization; >>>>> >>>>> By recursive induction all KTables are actually always materialized, >> and >>>>> then the effect of the "materialize()" is just for specifying the state >>>>> store names. In this scenario, we do not need to send Change<V> in >>>>> repartition topics within joins any more, but only for repartitions >>>> topics >>>>> within aggregations. Instead, we can just send a "tombstone" without >> the >>>>> old value and we do not need to calculate joins twice (one more time >> when >>>>> old value is received). >>>>> >>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()" >>>> function >>>>> which is interpreted as a dummy-aggregation where the new value always >>>>> replaces the old value. I have seen a couple of use cases of this, for >>>>> example, users want to read a changelog topic, apply some filters, and >>>> then >>>>> materialize it into a KTable with state stores without creating >>>> duplicated >>>>> changelog topics. With materialize() and toTable I'd imagine users can >>>>> specify sth. like: >>>>> >>>>> " >>>>> KStream stream = builder.stream("topic1").filter(..); >>>>> KTable table = stream.toTable(..); >>>>> table.materialize("state1"); >>>>> " >>>>> >>>>> And the library in this case could set store "state1" 's changelog >> topic >>>> to >>>>> be "topic1", and applying the filter on the fly while (re-)storing its >>>>> state by reading from this topic, instead of creating a second >> changelog >>>>> topic like "appID-state1-changelog" which is a semi-duplicate of >>>> "topic1". >>>>> >>>>> >>>>> Detailed: >>>>> >>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking >>>> about >>>>> renaming to "#toChangeLog" but after thinking a bit more I think >>>> #toStream >>>>> is still better, and we can just mention in the javaDoc that it is >>>>> transforming its underlying changelog stream to a normal stream. >>>>> 2. As Damian mentioned, there are a few scenarios where the serdes are >>>>> already specified in a previous operation whereas it is not known >> before >>>>> calling materialize, for example: >>>>> stream.groupByKey.agg(serde).materialize(serde) v.s. >> table.mapValues(/*no >>>>> serde specified*/).materialize(serde). We need to specify what are the >>>>> handling logic here. >>>>> 3. We can remove "KTable#to" call as well, and enforce users to call " >>>>> KTable.toStream.to" to be more clear. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.there...@gmail.com> >>>>> wrote: >>>>> >>>>>> I think changing it to `toKStream` would make it absolutely clear what >>>> we >>>>>> are converting it to. >>>>>> >>>>>> I'd say we should probably change the KStreamBuilder methods (but not >> in >>>>>> this KIP). >>>>>> >>>>>> Thanks >>>>>> Eno >>>>>> >>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> wrote: >>>>>>> >>>>>>>> Rename toStream() to toKStream() for consistency. >>>>>>> >>>>>>> Not sure whether that is really required. We also use >>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, >>>> and >>>>>>> don't care about the "K" prefix. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska < >> eno.there...@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Damian, answers inline: >>>>>>>> >>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi Eno, >>>>>>>>> >>>>>>>>> Thanks for the KIP. Some comments: >>>>>>>>> >>>>>>>>> 1. I'd probably rename materialized to materialize. >>>>>>>> >>>>>>>> Ok. >>>>>>>> >>>>>>>>> 2. I don't think the addition of the new Log compaction mechanism >> is >>>>>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe >>>> that >>>>>>>>> should be a different KIP? >>>>>>>> >>>>>>>> Agreed, already removed. Will do a separate KIP for that. >>>>>>>> >>>>>>>> >>>>>>>>> 3. What will happen when you call materialize on KTable that is >>>>>> already >>>>>>>>> materialized? Will it create another StateStore (providing the >> name >>>> is >>>>>>>>> different), throw an Exception? >>>>>>>> >>>>>>>> Currently an exception is thrown, but see below. >>>>>>>> >>>>>>>> >>>>>>>>> 4. Have you considered overloading the existing KTable operations >> to >>>>>>>> add >>>>>>>>> a state store name? So if a state store name is provided, then >>>>>>>> materialize >>>>>>>>> a state store? This would be my preferred approach as i don't >> think >>>>>>>>> materialize is always a valid operation. >>>>>>>> >>>>>>>> Ok I can see your point. This will increase the KIP size since I'll >>>> need >>>>>>>> to enumerate all overloaded methods, but it's not a problem. >>>>>>>> >>>>>>>>> 5. The materialize method will need ta value Serde as some >>>> operations, >>>>>>>>> i.e., mapValues, join etc can change the value types >>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean >>>> that >>>>>>>> we >>>>>>>>> always need to materialize the StateStore for KTable-KTable joins. >>>> If >>>>>>>> that >>>>>>>>> is the case, then the KTable Join operators will also need Serde >>>>>>>>> information. >>>>>>>> >>>>>>>> I'll update the KIP with the serdes. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Damian >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.there...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> We created "KIP-114: KTable materialization and improved >> semantics" >>>> to >>>>>>>>>> solidify the KTable semantics in Kafka Streams: >>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 114%3A+KTable+materialization+and+improved+semantics >>>>>>>>>> < >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 114:+KTable+materialization+and+improved+semantics >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Your feedback is appreciated. >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature