I'm not a fan of materialize. I think it interrupts the flow, i.e, table.mapValue(..).materialize().join(..).materialize() compared to: table.mapValues(..).join(..)
I know which one i prefer. My preference is stil to provide overloaded methods where people can specify the store names if they want, otherwise we just generate them. On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <[email protected]> wrote: > Hi, > > thanks for the KIP Eno! Here are my 2 cents: > > 1) I like Guozhang's proposal about removing store name from all KTable > methods and generate internal names (however, I would do this as > overloads). Furthermore, I would not force users to call .materialize() > if they want to query a store, but add one more method .stateStoreName() > that returns the store name if the KTable is materialized. Thus, also > .materialize() must not necessarily have a parameter storeName (ie, we > should have some overloads here). > > I would also not allow to provide a null store name (to indicate no > materialization if not necessary) but throw an exception. > > This yields some simplification (see below). > > > 2) I also like Guozhang's proposal about KStream#toTable() > > > 3) > > > >> 3. What will happen when you call materialize on KTable that is > already > >> materialized? Will it create another StateStore (providing the name is > >> different), throw an Exception? > > > > Currently an exception is thrown, but see below. > > > > > > If we follow approach (1) from Guozhang, there is no need to worry about > a second materialization and also no exception must be throws. A call to > .materialize() basically sets a "materialized flag" (ie, idempotent > operation) and sets a new name. > > > 4) > >> Rename toStream() to toKStream() for consistency. > > > > Not sure whether that is really required. We also use > > `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, and > > don't care about the "K" prefix. > > Eno's reply: > > I think changing it to `toKStream` would make it absolutely clear what > we are converting it to. > > > > I'd say we should probably change the KStreamBuilder methods (but not in > this KIP). > > I would keep #toStream(). (see below) > > > 5) We should not remove any methods but only deprecate them. > > > > A general note: > > I do not understand your comments "Rejected Alternatives". You say "Have > the KTable be the materialized view" was rejected. But your KIP actually > does exactly this -- the changelog abstraction of KTable is secondary > after those changes and the "view" abstraction is what a KTable is. And > just to be clear, I like this a lot: > > - it aligns with the name KTable > - is aligns with stream-table-duality > - it aligns with IQ > > I would say that a KTable is a "view abstraction" (as materialization is > optional). > > > > -Matthias > > > > > On 1/22/17 5:05 PM, Guozhang Wang wrote: > > Thanks for the KIP Eno, I have a few meta comments and a few detailed > > comments: > > > > 1. I like the materialize() function in general, but I would like to see > > how other KTable functions should be updated accordingly. For example, 1) > > KStreamBuilder.table(..) has a state store name parameter, and we will > > always materialize the KTable unless its state store name is set to null; > > 2) KTable.agg requires the result KTable to be materialized, and hence it > > also have a state store name; 3) KTable.join requires the joining table > to > > be materialized. And today we do not actually have a mechanism to enforce > > that, but will only throw an exception at runtime if it is not (e.g. if > you > > have "builder.table("topic", null).join()" a RTE will be thrown). > > > > I'd make an extended proposal just to kick off the discussion here: let's > > remove all the state store params in other KTable functions, and if in > some > > cases KTable have to be materialized (e.g. KTable resulted from KXX.agg) > > and users do not call materialize(), then we treat it as "users are not > > interested in querying it at all" and hence use an internal name > generated > > for the materialized KTable; i.e. although it is materialized the state > > store is not exposed to users. And if users call materialize() afterwards > > but we have already decided to materialize it, we can replace the > internal > > name with the user's provided names. Then from a user's point-view, if > they > > ever want to query a KTable, they have to call materialize() with a given > > state store name. This approach has one awkwardness though, that serdes > and > > state store names param are not separated and could be overlapped (see > > detailed comment #2 below). > > > > > > 2. This step does not need to be included in this KIP, but just as a > > reference / future work: as we have discussed before, we may enforce > > materialize KTable.join resulted KTables as well in the future. If we do > > that, then: > > > > a) KXX.agg resulted KTables are always materialized; > > b) KTable.agg requires the aggregating KTable to always be materialized > > (otherwise we would not know the old value); > > c) KTable.join resulted KTables are always materialized, and so are the > > joining KTables to always be materialized. > > d) KTable.filter/mapValues resulted KTables materialization depend on its > > parent's materialization; > > > > By recursive induction all KTables are actually always materialized, and > > then the effect of the "materialize()" is just for specifying the state > > store names. In this scenario, we do not need to send Change<V> in > > repartition topics within joins any more, but only for repartitions > topics > > within aggregations. Instead, we can just send a "tombstone" without the > > old value and we do not need to calculate joins twice (one more time when > > old value is received). > > > > 3. I'm wondering if it is worth-while to add a "KStream#toTable()" > function > > which is interpreted as a dummy-aggregation where the new value always > > replaces the old value. I have seen a couple of use cases of this, for > > example, users want to read a changelog topic, apply some filters, and > then > > materialize it into a KTable with state stores without creating > duplicated > > changelog topics. With materialize() and toTable I'd imagine users can > > specify sth. like: > > > > " > > KStream stream = builder.stream("topic1").filter(..); > > KTable table = stream.toTable(..); > > table.materialize("state1"); > > " > > > > And the library in this case could set store "state1" 's changelog topic > to > > be "topic1", and applying the filter on the fly while (re-)storing its > > state by reading from this topic, instead of creating a second changelog > > topic like "appID-state1-changelog" which is a semi-duplicate of > "topic1". > > > > > > Detailed: > > > > 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking > about > > renaming to "#toChangeLog" but after thinking a bit more I think > #toStream > > is still better, and we can just mention in the javaDoc that it is > > transforming its underlying changelog stream to a normal stream. > > 2. As Damian mentioned, there are a few scenarios where the serdes are > > already specified in a previous operation whereas it is not known before > > calling materialize, for example: > > stream.groupByKey.agg(serde).materialize(serde) v.s. table.mapValues(/*no > > serde specified*/).materialize(serde). We need to specify what are the > > handling logic here. > > 3. We can remove "KTable#to" call as well, and enforce users to call " > > KTable.toStream.to" to be more clear. > > > > > > Guozhang > > > > > > On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <[email protected]> > > wrote: > > > >> I think changing it to `toKStream` would make it absolutely clear what > we > >> are converting it to. > >> > >> I'd say we should probably change the KStreamBuilder methods (but not in > >> this KIP). > >> > >> Thanks > >> Eno > >> > >>> On 17 Jan 2017, at 13:59, Michael Noll <[email protected]> wrote: > >>> > >>>> Rename toStream() to toKStream() for consistency. > >>> > >>> Not sure whether that is really required. We also use > >>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, > and > >>> don't care about the "K" prefix. > >>> > >>> > >>> > >>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <[email protected] > > > >>> wrote: > >>> > >>>> Thanks Damian, answers inline: > >>>> > >>>>> On 16 Jan 2017, at 17:17, Damian Guy <[email protected]> wrote: > >>>>> > >>>>> Hi Eno, > >>>>> > >>>>> Thanks for the KIP. Some comments: > >>>>> > >>>>> 1. I'd probably rename materialized to materialize. > >>>> > >>>> Ok. > >>>> > >>>>> 2. I don't think the addition of the new Log compaction mechanism is > >>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe > that > >>>>> should be a different KIP? > >>>> > >>>> Agreed, already removed. Will do a separate KIP for that. > >>>> > >>>> > >>>>> 3. What will happen when you call materialize on KTable that is > >> already > >>>>> materialized? Will it create another StateStore (providing the name > is > >>>>> different), throw an Exception? > >>>> > >>>> Currently an exception is thrown, but see below. > >>>> > >>>> > >>>>> 4. Have you considered overloading the existing KTable operations to > >>>> add > >>>>> a state store name? So if a state store name is provided, then > >>>> materialize > >>>>> a state store? This would be my preferred approach as i don't think > >>>>> materialize is always a valid operation. > >>>> > >>>> Ok I can see your point. This will increase the KIP size since I'll > need > >>>> to enumerate all overloaded methods, but it's not a problem. > >>>> > >>>>> 5. The materialize method will need ta value Serde as some > operations, > >>>>> i.e., mapValues, join etc can change the value types > >>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean > that > >>>> we > >>>>> always need to materialize the StateStore for KTable-KTable joins. > If > >>>> that > >>>>> is the case, then the KTable Join operators will also need Serde > >>>>> information. > >>>> > >>>> I'll update the KIP with the serdes. > >>>> > >>>> Thanks > >>>> Eno > >>>> > >>>> > >>>>> > >>>>> Cheers, > >>>>> Damian > >>>>> > >>>>> > >>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <[email protected]> > >>>> wrote: > >>>>> > >>>>>> Hello, > >>>>>> > >>>>>> We created "KIP-114: KTable materialization and improved semantics" > to > >>>>>> solidify the KTable semantics in Kafka Streams: > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>> 114%3A+KTable+materialization+and+improved+semantics > >>>>>> < > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>> 114:+KTable+materialization+and+improved+semantics > >>>>>>> > >>>>>> > >>>>>> Your feedback is appreciated. > >>>>>> Thanks > >>>>>> Eno > >>>> > >>>> > >> > >> > > > > > >
