Like Damian, and for the same reasons, I am more in favor of overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose the
seemingly simpler solution of always materializing tables/state stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi,
>
> Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
> ValueGetter of Filter it will apply the filter and should be completely
> transparent as to if another processor or IQ is accessing it? How can this
> new method help?
>
> I cannot see the reason for the additional materialize method being
> required! Hence I suggest leave it alone.
> regarding removing the others I dont have strong opinions and it seems to
> be unrelated.
>
> Best Jan
>
>
>
>
> On 26.01.2017 20:48, Eno Thereska wrote:
>
>> Forwarding this thread to the users list too in case people would like to
>> comment. It is also on the dev list.
>>
>> Thanks
>> Eno
>>
>> Begin forwarded message:
>>>
>>> From: "Matthias J. Sax" <matth...@confluent.io>
>>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
>>> semantics
>>> Date: 24 January 2017 at 19:30:10 GMT
>>> To: dev@kafka.apache.org
>>> Reply-To: dev@kafka.apache.org
>>>
>>> That not what I meant by "huge impact".
>>>
>>> I refer to the actions related to materialize a KTable: creating a
>>> RocksDB store and a changelog topic -- users should be aware about
>>> runtime implication and this is better expressed by an explicit method
>>> call, rather than implicitly triggered by using a different overload of
>>> a method.
>>>
>>>
>>> -Matthias
>>>
>>> On 1/24/17 1:35 AM, Damian Guy wrote:
>>>
>>>> I think your definition of a huge impact and mine are rather different
>>>> ;-P
>>>> Overloading a few methods  is not really a huge impact IMO. It is also a
>>>> sacrifice worth making for readability, usability of the API.
>>>>
>>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>> I understand your argument, but do not agree with it.
>>>>>
>>>>> Your first version (even if the "flow" is not as nice) is more explicit
>>>>> than the second version. Adding a stateStoreName parameter is quite
>>>>> implicit but has a huge impact -- thus, I prefer the rather more
>>>>> verbose
>>>>> but explicit version.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 1/23/17 1:39 AM, Damian Guy wrote:
>>>>>
>>>>>> I'm not a fan of materialize. I think it interrupts the flow, i.e,
>>>>>>
>>>>>> table.mapValue(..).materialize().join(..).materialize()
>>>>>> compared to:
>>>>>> table.mapValues(..).join(..)
>>>>>>
>>>>>> I know which one i prefer.
>>>>>> My preference is stil to provide overloaded methods where people can
>>>>>> specify the store names if they want, otherwise we just generate them.
>>>>>>
>>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io>
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>>
>>>>>>> thanks for the KIP Eno! Here are my 2 cents:
>>>>>>>
>>>>>>> 1) I like Guozhang's proposal about removing store name from all
>>>>>>> KTable
>>>>>>> methods and generate internal names (however, I would do this as
>>>>>>> overloads). Furthermore, I would not force users to call
>>>>>>> .materialize()
>>>>>>> if they want to query a store, but add one more method
>>>>>>> .stateStoreName()
>>>>>>> that returns the store name if the KTable is materialized. Thus, also
>>>>>>> .materialize() must not necessarily have a parameter storeName (ie,
>>>>>>> we
>>>>>>> should have some overloads here).
>>>>>>>
>>>>>>> I would also not allow to provide a null store name (to indicate no
>>>>>>> materialization if not necessary) but throw an exception.
>>>>>>>
>>>>>>> This yields some simplification (see below).
>>>>>>>
>>>>>>>
>>>>>>> 2) I also like Guozhang's proposal about KStream#toTable()
>>>>>>>
>>>>>>>
>>>>>>> 3)
>>>>>>>
>>>>>>>>   3. What will happen when you call materialize on KTable that is
>>>>>>>>>
>>>>>>>> already
>>>>>>>
>>>>>>>>   materialized? Will it create another StateStore (providing the
>>>>>>>>> name
>>>>>>>>>
>>>>>>>> is
>>>>>
>>>>>>   different), throw an Exception?
>>>>>>>>>
>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>
>>>>>>>>
>>>>>>>> If we follow approach (1) from Guozhang, there is no need to worry
>>>>>>> about
>>>>>>> a second materialization and also no exception must be throws. A
>>>>>>> call to
>>>>>>> .materialize() basically sets a "materialized flag" (ie, idempotent
>>>>>>> operation) and sets a new name.
>>>>>>>
>>>>>>>
>>>>>>> 4)
>>>>>>>
>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>
>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>>>>>>>>
>>>>>>> and
>>>>>
>>>>>> don't care about the "K" prefix.
>>>>>>>>
>>>>>>> Eno's reply:
>>>>>>>
>>>>>>>> I think changing it to `toKStream` would make it absolutely clear
>>>>>>>> what
>>>>>>>>
>>>>>>> we are converting it to.
>>>>>>>
>>>>>>>> I'd say we should probably change the KStreamBuilder methods (but
>>>>>>>> not
>>>>>>>>
>>>>>>> in
>>>>>
>>>>>> this KIP).
>>>>>>>
>>>>>>> I would keep #toStream(). (see below)
>>>>>>>
>>>>>>>
>>>>>>> 5) We should not remove any methods but only deprecate them.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> A general note:
>>>>>>>
>>>>>>> I do not understand your comments "Rejected Alternatives". You say
>>>>>>> "Have
>>>>>>> the KTable be the materialized view" was rejected. But your KIP
>>>>>>> actually
>>>>>>> does exactly this -- the changelog abstraction of KTable is secondary
>>>>>>> after those changes and the "view" abstraction is what a KTable is.
>>>>>>> And
>>>>>>> just to be clear, I like this a lot:
>>>>>>>
>>>>>>> - it aligns with the name KTable
>>>>>>> - is aligns with stream-table-duality
>>>>>>> - it aligns with IQ
>>>>>>>
>>>>>>> I would say that a KTable is a "view abstraction" (as
>>>>>>> materialization is
>>>>>>> optional).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP Eno, I have a few meta comments and a few
>>>>>>>> detailed
>>>>>>>> comments:
>>>>>>>>
>>>>>>>> 1. I like the materialize() function in general, but I would like to
>>>>>>>>
>>>>>>> see
>>>>>
>>>>>> how other KTable functions should be updated accordingly. For example,
>>>>>>>>
>>>>>>> 1)
>>>>>
>>>>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>>>>>>> always materialize the KTable unless its state store name is set to
>>>>>>>>
>>>>>>> null;
>>>>>
>>>>>> 2) KTable.agg requires the result KTable to be materialized, and hence
>>>>>>>>
>>>>>>> it
>>>>>
>>>>>> also have a state store name; 3) KTable.join requires the joining
>>>>>>>> table
>>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>>> be materialized. And today we do not actually have a mechanism to
>>>>>>>>
>>>>>>> enforce
>>>>>
>>>>>> that, but will only throw an exception at runtime if it is not (e.g.
>>>>>>>> if
>>>>>>>>
>>>>>>> you
>>>>>>>
>>>>>>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>>>>>>
>>>>>>>> I'd make an extended proposal just to kick off the discussion here:
>>>>>>>>
>>>>>>> let's
>>>>>
>>>>>> remove all the state store params in other KTable functions, and if in
>>>>>>>>
>>>>>>> some
>>>>>>>
>>>>>>>> cases KTable have to be materialized (e.g. KTable resulted from
>>>>>>>>
>>>>>>> KXX.agg)
>>>>>
>>>>>> and users do not call materialize(), then we treat it as "users are
>>>>>>>> not
>>>>>>>> interested in querying it at all" and hence use an internal name
>>>>>>>>
>>>>>>> generated
>>>>>>>
>>>>>>>> for the materialized KTable; i.e. although it is materialized the
>>>>>>>> state
>>>>>>>> store is not exposed to users. And if users call materialize()
>>>>>>>>
>>>>>>> afterwards
>>>>>
>>>>>> but we have already decided to materialize it, we can replace the
>>>>>>>>
>>>>>>> internal
>>>>>>>
>>>>>>>> name with the user's provided names. Then from a user's point-view,
>>>>>>>> if
>>>>>>>>
>>>>>>> they
>>>>>>>
>>>>>>>> ever want to query a KTable, they have to call materialize() with a
>>>>>>>>
>>>>>>> given
>>>>>
>>>>>> state store name. This approach has one awkwardness though, that
>>>>>>>> serdes
>>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>> state store names param are not separated and could be overlapped
>>>>>>>> (see
>>>>>>>> detailed comment #2 below).
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. This step does not need to be included in this KIP, but just as a
>>>>>>>> reference / future work: as we have discussed before, we may enforce
>>>>>>>> materialize KTable.join resulted KTables as well in the future. If
>>>>>>>> we
>>>>>>>>
>>>>>>> do
>>>>>
>>>>>> that, then:
>>>>>>>>
>>>>>>>> a) KXX.agg resulted KTables are always materialized;
>>>>>>>> b) KTable.agg requires the aggregating KTable to always be
>>>>>>>> materialized
>>>>>>>> (otherwise we would not know the old value);
>>>>>>>> c) KTable.join resulted KTables are always materialized, and so are
>>>>>>>> the
>>>>>>>> joining KTables to always be materialized.
>>>>>>>> d) KTable.filter/mapValues resulted KTables materialization depend
>>>>>>>> on
>>>>>>>>
>>>>>>> its
>>>>>
>>>>>> parent's materialization;
>>>>>>>>
>>>>>>>> By recursive induction all KTables are actually always materialized,
>>>>>>>>
>>>>>>> and
>>>>>
>>>>>> then the effect of the "materialize()" is just for specifying the
>>>>>>>> state
>>>>>>>> store names. In this scenario, we do not need to send Change<V> in
>>>>>>>> repartition topics within joins any more, but only for repartitions
>>>>>>>>
>>>>>>> topics
>>>>>>>
>>>>>>>> within aggregations. Instead, we can just send a "tombstone" without
>>>>>>>>
>>>>>>> the
>>>>>
>>>>>> old value and we do not need to calculate joins twice (one more time
>>>>>>>>
>>>>>>> when
>>>>>
>>>>>> old value is received).
>>>>>>>>
>>>>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()"
>>>>>>>>
>>>>>>> function
>>>>>>>
>>>>>>>> which is interpreted as a dummy-aggregation where the new value
>>>>>>>> always
>>>>>>>> replaces the old value. I have seen a couple of use cases of this,
>>>>>>>> for
>>>>>>>> example, users want to read a changelog topic, apply some filters,
>>>>>>>> and
>>>>>>>>
>>>>>>> then
>>>>>>>
>>>>>>>> materialize it into a KTable with state stores without creating
>>>>>>>>
>>>>>>> duplicated
>>>>>>>
>>>>>>>> changelog topics. With materialize() and toTable I'd imagine users
>>>>>>>> can
>>>>>>>> specify sth. like:
>>>>>>>>
>>>>>>>> "
>>>>>>>> KStream stream = builder.stream("topic1").filter(..);
>>>>>>>> KTable table = stream.toTable(..);
>>>>>>>> table.materialize("state1");
>>>>>>>> "
>>>>>>>>
>>>>>>>> And the library in this case could set store "state1" 's changelog
>>>>>>>>
>>>>>>> topic
>>>>>
>>>>>> to
>>>>>>>
>>>>>>>> be "topic1", and applying the filter on the fly while (re-)storing
>>>>>>>> its
>>>>>>>> state by reading from this topic, instead of creating a second
>>>>>>>>
>>>>>>> changelog
>>>>>
>>>>>> topic like "appID-state1-changelog" which is a semi-duplicate of
>>>>>>>>
>>>>>>> "topic1".
>>>>>>>
>>>>>>>>
>>>>>>>> Detailed:
>>>>>>>>
>>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was
>>>>>>>> thinking
>>>>>>>>
>>>>>>> about
>>>>>>>
>>>>>>>> renaming to "#toChangeLog" but after thinking a bit more I think
>>>>>>>>
>>>>>>> #toStream
>>>>>>>
>>>>>>>> is still better, and we can just mention in the javaDoc that it is
>>>>>>>> transforming its underlying changelog stream to a normal stream.
>>>>>>>> 2. As Damian mentioned, there are a few scenarios where the serdes
>>>>>>>> are
>>>>>>>> already specified in a previous operation whereas it is not known
>>>>>>>>
>>>>>>> before
>>>>>
>>>>>> calling materialize, for example:
>>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
>>>>>>>>
>>>>>>> table.mapValues(/*no
>>>>>
>>>>>> serde specified*/).materialize(serde). We need to specify what are
>>>>>>>> the
>>>>>>>> handling logic here.
>>>>>>>> 3. We can remove "KTable#to" call as well, and enforce users to
>>>>>>>> call "
>>>>>>>> KTable.toStream.to" to be more clear.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
>>>>>>>> eno.there...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I think changing it to `toKStream` would make it absolutely clear
>>>>>>>>> what
>>>>>>>>>
>>>>>>>> we
>>>>>>>
>>>>>>>> are converting it to.
>>>>>>>>>
>>>>>>>>> I'd say we should probably change the KStreamBuilder methods (but
>>>>>>>>> not
>>>>>>>>>
>>>>>>>> in
>>>>>
>>>>>> this KIP).
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>>
>>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>>>>>>>
>>>>>>>>>> Not sure whether that is really required. We also use
>>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for
>>>>>>>>>> example,
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>
>>>>>>>> don't care about the "K" prefix.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
>>>>>>>>>>
>>>>>>>>> eno.there...@gmail.com
>>>>>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks Damian, answers inline:
>>>>>>>>>>>
>>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Eno,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. I'd probably rename materialized to materialize.
>>>>>>>>>>>>
>>>>>>>>>>> Ok.
>>>>>>>>>>>
>>>>>>>>>>> 2. I don't think the addition of the new Log compaction mechanism
>>>>>>>>>>>>
>>>>>>>>>>> is
>>>>>
>>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe
>>>>>>>>>>>>
>>>>>>>>>>> that
>>>>>>>
>>>>>>>> should be a different KIP?
>>>>>>>>>>>>
>>>>>>>>>>> Agreed, already removed. Will do a separate KIP for that.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 3. What will happen when you call materialize on KTable that is
>>>>>>>>>>>>
>>>>>>>>>>> already
>>>>>>>>>
>>>>>>>>>> materialized? Will it create another StateStore (providing the
>>>>>>>>>>>>
>>>>>>>>>>> name
>>>>>
>>>>>> is
>>>>>>>
>>>>>>>> different), throw an Exception?
>>>>>>>>>>>>
>>>>>>>>>>> Currently an exception is thrown, but see below.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4. Have you considered overloading the existing KTable operations
>>>>>>>>>>>>
>>>>>>>>>>> to
>>>>>
>>>>>> add
>>>>>>>>>>>
>>>>>>>>>>>> a state store name? So if a state store name is provided, then
>>>>>>>>>>>>
>>>>>>>>>>> materialize
>>>>>>>>>>>
>>>>>>>>>>>> a state store? This would be my preferred approach as i don't
>>>>>>>>>>>>
>>>>>>>>>>> think
>>>>>
>>>>>> materialize is always a valid operation.
>>>>>>>>>>>>
>>>>>>>>>>> Ok I can see your point. This will increase the KIP size since
>>>>>>>>>>> I'll
>>>>>>>>>>>
>>>>>>>>>> need
>>>>>>>
>>>>>>>> to enumerate all overloaded methods, but it's not a problem.
>>>>>>>>>>>
>>>>>>>>>>> 5. The materialize method will need ta value Serde as some
>>>>>>>>>>>>
>>>>>>>>>>> operations,
>>>>>>>
>>>>>>>> i.e., mapValues, join etc can change the value types
>>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609 - might
>>>>>>>>>>>> mean
>>>>>>>>>>>>
>>>>>>>>>>> that
>>>>>>>
>>>>>>>> we
>>>>>>>>>>>
>>>>>>>>>>>> always need to materialize the StateStore for KTable-KTable
>>>>>>>>>>>> joins.
>>>>>>>>>>>>
>>>>>>>>>>> If
>>>>>>>
>>>>>>>> that
>>>>>>>>>>>
>>>>>>>>>>>> is the case, then the KTable Join operators will also need Serde
>>>>>>>>>>>> information.
>>>>>>>>>>>>
>>>>>>>>>>> I'll update the KIP with the serdes.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Damian
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <
>>>>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We created "KIP-114: KTable materialization and improved
>>>>>>>>>>>>>
>>>>>>>>>>>> semantics"
>>>>>
>>>>>> to
>>>>>>>
>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>
>>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>>>>>
>>>>>>>>>>>> <
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>
>>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>>>
>>>>>>>>>>>> Your feedback is appreciated.
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>
>

Reply via email to