Thanks for the KIP Eno, I have a few meta comments and a few detailed
comments:

1. I like the materialize() function in general, but I would like to see
how other KTable functions should be updated accordingly. For example, 1)
KStreamBuilder.table(..) has a state store name parameter, and we will
always materialize the KTable unless its state store name is set to null;
2) KTable.agg requires the result KTable to be materialized, and hence it
also have a state store name; 3) KTable.join requires the joining table to
be materialized. And today we do not actually have a mechanism to enforce
that, but will only throw an exception at runtime if it is not (e.g. if you
have "builder.table("topic", null).join()" a RTE will be thrown).

I'd make an extended proposal just to kick off the discussion here: let's
remove all the state store params in other KTable functions, and if in some
cases KTable have to be materialized (e.g. KTable resulted from KXX.agg)
and users do not call materialize(), then we treat it as "users are not
interested in querying it at all" and hence use an internal name generated
for the materialized KTable; i.e. although it is materialized the state
store is not exposed to users. And if users call materialize() afterwards
but we have already decided to materialize it, we can replace the internal
name with the user's provided names. Then from a user's point-view, if they
ever want to query a KTable, they have to call materialize() with a given
state store name. This approach has one awkwardness though, that serdes and
state store names param are not separated and could be overlapped (see
detailed comment #2 below).


2. This step does not need to be included in this KIP, but just as a
reference / future work: as we have discussed before, we may enforce
materialize KTable.join resulted KTables as well in the future. If we do
that, then:

a) KXX.agg resulted KTables are always materialized;
b) KTable.agg requires the aggregating KTable to always be materialized
(otherwise we would not know the old value);
c) KTable.join resulted KTables are always materialized, and so are the
joining KTables to always be materialized.
d) KTable.filter/mapValues resulted KTables materialization depend on its
parent's materialization;

By recursive induction all KTables are actually always materialized, and
then the effect of the "materialize()" is just for specifying the state
store names. In this scenario, we do not need to send Change<V> in
repartition topics within joins any more, but only for repartitions topics
within aggregations. Instead, we can just send a "tombstone" without the
old value and we do not need to calculate joins twice (one more time when
old value is received).

3. I'm wondering if it is worth-while to add a "KStream#toTable()" function
which is interpreted as a dummy-aggregation where the new value always
replaces the old value. I have seen a couple of use cases of this, for
example, users want to read a changelog topic, apply some filters, and then
materialize it into a KTable with state stores without creating duplicated
changelog topics. With materialize() and toTable I'd imagine users can
specify sth. like:

"
KStream stream = builder.stream("topic1").filter(..);
KTable table = stream.toTable(..);
table.materialize("state1");
"

And the library in this case could set store "state1" 's changelog topic to
be "topic1", and applying the filter on the fly while (re-)storing its
state by reading from this topic, instead of creating a second changelog
topic like "appID-state1-changelog" which is a semi-duplicate of "topic1".


Detailed:

1. I'm +1 with Michael regarding "#toStream"; actually I was thinking about
renaming to "#toChangeLog" but after thinking a bit more I think #toStream
is still better, and we can just mention in the javaDoc that it is
transforming its underlying changelog stream to a normal stream.
2. As Damian mentioned, there are a few scenarios where the serdes are
already specified in a previous operation whereas it is not known before
calling materialize, for example:
stream.groupByKey.agg(serde).materialize(serde) v.s. table.mapValues(/*no
serde specified*/).materialize(serde). We need to specify what are the
handling logic here.
3. We can remove "KTable#to" call as well, and enforce users to call "
KTable.toStream.to" to be more clear.


Guozhang


On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> I think changing it to `toKStream` would make it absolutely clear what we
> are converting it to.
>
> I'd say we should probably change the KStreamBuilder methods (but not in
> this KIP).
>
> Thanks
> Eno
>
> > On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> wrote:
> >
> >> Rename toStream() to toKStream() for consistency.
> >
> > Not sure whether that is really required. We also use
> > `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, and
> > don't care about the "K" prefix.
> >
> >
> >
> > On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Thanks Damian, answers inline:
> >>
> >>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> wrote:
> >>>
> >>> Hi Eno,
> >>>
> >>> Thanks for the KIP. Some comments:
> >>>
> >>>  1. I'd probably rename materialized to materialize.
> >>
> >> Ok.
> >>
> >>>  2. I don't think the addition of the new Log compaction mechanism is
> >>>  necessary for this KIP, i.e, the KIP is useful without it. Maybe that
> >>>  should be a different KIP?
> >>
> >> Agreed, already removed. Will do a separate KIP for that.
> >>
> >>
> >>>  3. What will happen when you call materialize on KTable that is
> already
> >>>  materialized? Will it create another StateStore (providing the name is
> >>>  different), throw an Exception?
> >>
> >> Currently an exception is thrown, but see below.
> >>
> >>
> >>>  4. Have you considered overloading the existing KTable operations to
> >> add
> >>>  a state store name? So if a state store name is provided, then
> >> materialize
> >>>  a state store? This would be my preferred approach as i don't think
> >>>  materialize is always a valid operation.
> >>
> >> Ok I can see your point. This will increase the KIP size since I'll need
> >> to enumerate all overloaded methods, but it's not a problem.
> >>
> >>>  5. The materialize method will need ta value Serde as some operations,
> >>>  i.e., mapValues, join etc can change the value types
> >>>  6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean that
> >> we
> >>>  always need to materialize the StateStore for KTable-KTable joins. If
> >> that
> >>>  is the case, then the KTable Join operators will also need Serde
> >>>  information.
> >>
> >> I'll update the KIP with the serdes.
> >>
> >> Thanks
> >> Eno
> >>
> >>
> >>>
> >>> Cheers,
> >>> Damian
> >>>
> >>>
> >>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> We created "KIP-114: KTable materialization and improved semantics" to
> >>>> solidify the KTable semantics in Kafka Streams:
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 114%3A+KTable+materialization+and+improved+semantics
> >>>> <
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 114:+KTable+materialization+and+improved+semantics
> >>>>>
> >>>>
> >>>> Your feedback is appreciated.
> >>>> Thanks
> >>>> Eno
> >>
> >>
>
>


-- 
-- Guozhang

Reply via email to