Hi Gwen,

this is not a hint as in "make it smarter" this is a hint as to "make it work" wich should not require hinting.

Best Jan




On 27.01.2017 22:35, Gwen Shapira wrote:
Another vote in favor of overloading. I think the streams API actually
trains users quite well in realizing the implications of adding a
state-store - we need to figure out the correct Serde every single
time :)

Another option: "materialize" behaves almost as a SQL hint - i.e.
allows a user to control an implementation detail while working inside
a DSL that usually hides them. We should consider that this may not be
the last hint we'll need ("cache results", "predicate pushdown", hash
join vs merge join, etc), but in most cases, we won't be able to infer
a hint from the existence of an argument like state-store name.
Mathias suggestion to make .materialize() a top level method is
awkward precisely because it doesn't fit into the DSL model very well,
but if we have a generalized way to "hint" at operations, this could
be a good fit.

On Fri, Jan 27, 2017 at 7:49 AM, Michael Noll <mich...@confluent.io> wrote:
Like Damian, and for the same reasons, I am more in favor of overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose the
seemingly simpler solution of always materializing tables/state stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi,

Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
ValueGetter of Filter it will apply the filter and should be completely
transparent as to if another processor or IQ is accessing it? How can this
new method help?

I cannot see the reason for the additional materialize method being
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it seems to
be unrelated.

Best Jan




On 26.01.2017 20:48, Eno Thereska wrote:

Forwarding this thread to the users list too in case people would like to
comment. It is also on the dev list.

Thanks
Eno

Begin forwarded message:
From: "Matthias J. Sax" <matth...@confluent.io>
Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit method
call, rather than implicitly triggered by using a different overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:

I think your definition of a huge impact and mine are rather different
;-P
Overloading a few methods  is not really a huge impact IMO. It is also a
sacrifice worth making for readability, usability of the API.

On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matth...@confluent.io>
wrote:

I understand your argument, but do not agree with it.
Your first version (even if the "flow" is not as nice) is more explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more
verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:

I'm not a fan of materialize. I think it interrupts the flow, i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where people can
specify the store names if they want, otherwise we just generate them.

On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io>

wrote:

Hi,
thanks for the KIP Eno! Here are my 2 cents:

1) I like Guozhang's proposal about removing store name from all
KTable
methods and generate internal names (however, I would do this as
overloads). Furthermore, I would not force users to call
.materialize()
if they want to query a store, but add one more method
.stateStoreName()
that returns the store name if the KTable is materialized. Thus, also
.materialize() must not necessarily have a parameter storeName (ie,
we
should have some overloads here).

I would also not allow to provide a null store name (to indicate no
materialization if not necessary) but throw an exception.

This yields some simplification (see below).


2) I also like Guozhang's proposal about KStream#toTable()


3)

   3. What will happen when you call materialize on KTable that is
already
   materialized? Will it create another StateStore (providing the
name

is
   different), throw an Exception?
Currently an exception is thrown, but see below.


If we follow approach (1) from Guozhang, there is no need to worry
about
a second materialization and also no exception must be throws. A
call to
.materialize() basically sets a "materialized flag" (ie, idempotent
operation) and sets a new name.


4)

Rename toStream() to toKStream() for consistency.
Not sure whether that is really required. We also use
`KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,

and
don't care about the "K" prefix.
Eno's reply:

I think changing it to `toKStream` would make it absolutely clear
what

we are converting it to.

I'd say we should probably change the KStreamBuilder methods (but
not

in
this KIP).
I would keep #toStream(). (see below)


5) We should not remove any methods but only deprecate them.



A general note:

I do not understand your comments "Rejected Alternatives". You say
"Have
the KTable be the materialized view" was rejected. But your KIP
actually
does exactly this -- the changelog abstraction of KTable is secondary
after those changes and the "view" abstraction is what a KTable is.
And
just to be clear, I like this a lot:

- it aligns with the name KTable
- is aligns with stream-table-duality
- it aligns with IQ

I would say that a KTable is a "view abstraction" (as
materialization is
optional).



-Matthias




On 1/22/17 5:05 PM, Guozhang Wang wrote:

Thanks for the KIP Eno, I have a few meta comments and a few
detailed
comments:

1. I like the materialize() function in general, but I would like to

see
how other KTable functions should be updated accordingly. For example,
1)
KStreamBuilder.table(..) has a state store name parameter, and we will
always materialize the KTable unless its state store name is set to

null;
2) KTable.agg requires the result KTable to be materialized, and hence
it
also have a state store name; 3) KTable.join requires the joining
table

to

be materialized. And today we do not actually have a mechanism to

enforce
that, but will only throw an exception at runtime if it is not (e.g.
if

you

have "builder.table("topic", null).join()" a RTE will be thrown).

I'd make an extended proposal just to kick off the discussion here:

let's
remove all the state store params in other KTable functions, and if in
some

cases KTable have to be materialized (e.g. KTable resulted from

KXX.agg)
and users do not call materialize(), then we treat it as "users are
not
interested in querying it at all" and hence use an internal name

generated

for the materialized KTable; i.e. although it is materialized the
state
store is not exposed to users. And if users call materialize()

afterwards
but we have already decided to materialize it, we can replace the
internal

name with the user's provided names. Then from a user's point-view,
if

they

ever want to query a KTable, they have to call materialize() with a

given
state store name. This approach has one awkwardness though, that
serdes

and

state store names param are not separated and could be overlapped
(see
detailed comment #2 below).


2. This step does not need to be included in this KIP, but just as a
reference / future work: as we have discussed before, we may enforce
materialize KTable.join resulted KTables as well in the future. If
we

do
that, then:
a) KXX.agg resulted KTables are always materialized;
b) KTable.agg requires the aggregating KTable to always be
materialized
(otherwise we would not know the old value);
c) KTable.join resulted KTables are always materialized, and so are
the
joining KTables to always be materialized.
d) KTable.filter/mapValues resulted KTables materialization depend
on

its
parent's materialization;
By recursive induction all KTables are actually always materialized,

and
then the effect of the "materialize()" is just for specifying the
state
store names. In this scenario, we do not need to send Change<V> in
repartition topics within joins any more, but only for repartitions

topics

within aggregations. Instead, we can just send a "tombstone" without

the
old value and we do not need to calculate joins twice (one more time
when
old value is received).
3. I'm wondering if it is worth-while to add a "KStream#toTable()"

function

which is interpreted as a dummy-aggregation where the new value
always
replaces the old value. I have seen a couple of use cases of this,
for
example, users want to read a changelog topic, apply some filters,
and

then

materialize it into a KTable with state stores without creating

duplicated

changelog topics. With materialize() and toTable I'd imagine users
can
specify sth. like:

"
KStream stream = builder.stream("topic1").filter(..);
KTable table = stream.toTable(..);
table.materialize("state1");
"

And the library in this case could set store "state1" 's changelog

topic
to
be "topic1", and applying the filter on the fly while (re-)storing
its
state by reading from this topic, instead of creating a second

changelog
topic like "appID-state1-changelog" which is a semi-duplicate of
"topic1".

Detailed:

1. I'm +1 with Michael regarding "#toStream"; actually I was
thinking

about

renaming to "#toChangeLog" but after thinking a bit more I think

#toStream

is still better, and we can just mention in the javaDoc that it is
transforming its underlying changelog stream to a normal stream.
2. As Damian mentioned, there are a few scenarios where the serdes
are
already specified in a previous operation whereas it is not known

before
calling materialize, for example:
stream.groupByKey.agg(serde).materialize(serde) v.s.

table.mapValues(/*no
serde specified*/).materialize(serde). We need to specify what are
the
handling logic here.
3. We can remove "KTable#to" call as well, and enforce users to
call "
KTable.toStream.to" to be more clear.


Guozhang


On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
eno.there...@gmail.com>
wrote:

I think changing it to `toKStream` would make it absolutely clear
what

we
are converting it to.
I'd say we should probably change the KStreamBuilder methods (but
not

in
this KIP).
Thanks
Eno

On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io>
wrote:

Rename toStream() to toKStream() for consistency.
Not sure whether that is really required. We also use
`KStreamBuilder#stream()` and `KStreamBuilder#table()`, for
example,

and
don't care about the "K" prefix.


On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <

eno.there...@gmail.com
wrote:
Thanks Damian, answers inline:
On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com>
wrote:

Hi Eno,

Thanks for the KIP. Some comments:

1. I'd probably rename materialized to materialize.

Ok.

2. I don't think the addition of the new Log compaction mechanism
is
necessary for this KIP, i.e, the KIP is useful without it. Maybe
that
should be a different KIP?
Agreed, already removed. Will do a separate KIP for that.


3. What will happen when you call materialize on KTable that is
already
materialized? Will it create another StateStore (providing the
name
is
different), throw an Exception?
Currently an exception is thrown, but see below.


4. Have you considered overloading the existing KTable operations
to
add
a state store name? So if a state store name is provided, then

materialize

a state store? This would be my preferred approach as i don't

think
materialize is always a valid operation.
Ok I can see your point. This will increase the KIP size since
I'll

need
to enumerate all overloaded methods, but it's not a problem.
5. The materialize method will need ta value Serde as some
operations,
i.e., mapValues, join etc can change the value types
6. https://issues.apache.org/jira/browse/KAFKA-4609 - might
mean

that
we
always need to materialize the StateStore for KTable-KTable
joins.

If
that
is the case, then the KTable Join operators will also need Serde
information.

I'll update the KIP with the serdes.

Thanks
Eno


Cheers,
Damian


On Mon, 16 Jan 2017 at 16:44 Eno Thereska <
eno.there...@gmail.com>

wrote:

Hello,
We created "KIP-114: KTable materialization and improved

semantics"
to
solidify the KTable semantics in Kafka Streams:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-

114%3A+KTable+materialization+and+improved+semantics
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-

114:+KTable+materialization+and+improved+semantics
Your feedback is appreciated.
Thanks
Eno




Reply via email to