Hi Eno,

thanks for putting into different points. I want to put a few remarks inline.

Best Jan

On 30.01.2017 12:19, Eno Thereska wrote:
So I think there are several important discussion threads that are emerging 
here. Let me try to tease them apart:

1. inconsistency in what is materialized and what is not, what is queryable and 
what is not. I think we all agree there is some inconsistency there and this 
will be addressed with any of the proposed approaches. Addressing the 
inconsistency is the point of the original KIP.

2. the exact API for materializing a KTable. We can specify 1) a "store name" (as we do today) or 
2) have a ".materialize[d]" call or 3) get a handle from a KTable ".getQueryHandle" or 4) 
have a builder construct. So we have discussed 4 options. It is important to remember in this discussion that 
IQ is not designed for just local queries, but also for distributed queries. In all cases an identifying 
name/id is needed for the store that the user is interested in querying. So we end up with a discussion on 
who provides the name, the user (as done today) or if it is generated automatically (as Jan suggests, as I 
understand it). If it is generated automatically we need a way to expose these auto-generated names to the 
users and link them to the KTables they care to query.
Hi, the last sentence is what I currently arguing against. The user would never see a stringtype indentifier name or anything. All he gets is the queryHandle if he executes a get(K) that will be an interactive query get. with all the finding the right servers that currently have a copy of this underlying store stuff going on. The nice part is that if someone retrieves a queryHandle, you know that you have to materialized (if you are not already) as queries will be coming. Taking away the confusion mentioned in point 1 IMO.

3. The exact boundary between the DSL, that is the processing language, and the 
storage/IQ queries, and how we jump from one to the other. This is mostly for 
how we get a handle on a store (so it's related to point 2), rather than for 
how we query the store. I think we all agree that we don't want to limit ways 
one can query a store (e.g., using gets or range queries etc) and the query 
APIs are not in the scope of the DSL.
Does the IQ work with range currently? The range would have to be started on all stores and then merged by maybe the client. Range force a flush to RocksDB currently so I am sure you would get a performance hit right there. Time-windows might be okay, but I am not sure if the first version should offer the user range access.

4. The nature of the DSL and whether its declarative enough, or flexible 
enough. Damian made the point that he likes the builder pattern since users can 
specify, per KTable, things like caching and logging needs. His observation (as 
I understand it) is that the processor API (PAPI) is flexible but doesn't 
provide any help at all to users. The current DSL provides declarative 
abstractions, but it's not fine-grained enough. This point is much broader than 
the KIP, but discussing it in this KIPs context is ok, since we don't want to 
make small piecemeal changes and then realise we're not in the spot we want to 
be.
This is indeed much broader. My guess here is that's why both API's exists and helping the users to switch back and forth might be a thing.

Feel free to pitch in if I have misinterpreted something.

Thanks
Eno


On 30 Jan 2017, at 10:22, Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi Eno,

I have a really hard time understanding why we can't. From my point of view 
everything could be super elegant DSL only + public api for the PAPI-people as 
already exist.

The above aproach implementing a .get(K) on KTable is foolisch in my opinion as 
it would be to late to know that materialisation would be required.
But having an API that allows to indicate I want to query this table and then 
wrapping the say table's processorname can work out really really nice. The 
only obstacle I see is people not willing to spend the additional time in 
implementation and just want a quick shot option to make it work.

For me it would look like this:

table =  builder.table()
filteredTable = table.filter()
rawHandle = table.getQueryHandle() // Does the materialisation, really all 
names possible but id rather hide the implication of it materializes
filteredTableHandle = filteredTable.getQueryHandle() // this would _not_ 
materialize again of course, the source or the aggregator would stay the only 
materialized processors
streams = new streams(builder)

This middle part is highly flexible I could imagin to force the user todo 
something like this. This implies to the user that his streams need to be 
running
instead of propagating the missing initialisation back by exceptions. Also if 
the users is forced to pass the appropriate streams instance back can change.
I think its possible to build multiple streams out of  one topology so it would 
be easiest to implement aswell. This is just what I maybe had liked the most

streams.start();
rawHandle.prepare(streams)
filteredHandle.prepare(streams)

later the users can do

V value = rawHandle.get(K)
V value = filteredHandle.get(K)

This could free DSL users from anything like storenames and how and what to 
materialize. Can someone indicate what the problem would be implementing it 
like this.
Yes I am aware that the current IQ API will not support querying by 
KTableProcessorName instread of statestoreName. But I think that had to change 
if you want it to be intuitive
IMO you gotta apply the filter read time

Looking forward to your opinions

Best Jan


#DeathToIQMoreAndBetterConnectors



On 30.01.2017 10:42, Eno Thereska wrote:
Hi there,

The inconsistency will be resolved, whether with materialize or overloaded 
methods.

With the discussion on the DSL & stores I feel we've gone in a slightly 
different tangent, which is worth discussing nonetheless. We have entered into an 
argument around the scope of the DSL. The DSL has been designed primarily for 
processing. The DSL does not dictate ways to access state stores or what hind of 
queries to perform on them. Hence, I see the mechanism for accessing storage as 
decoupled from the DSL.

We could think of ways to get store handles from part of the DSL, like the 
KTable abstraction. However, subsequent queries will be store-dependent and not 
rely on the DSL, hence I'm not sure we get any grand-convergence DSL-Store 
here. So I am arguing that the current way of getting a handle on state stores 
is fine.

Thanks
Eno

On 30 Jan 2017, at 03:56, Guozhang Wang <wangg...@gmail.com> wrote:

Thinking loud here about the API options (materialize v.s. overloaded
functions) and its impact on IQ:

1. The first issue of the current DSL is that, there is inconsistency upon
whether / how KTables should be materialized:

    a) in many cases the library HAS TO materialize KTables no matter what,
e.g. KStream / KTable aggregation resulted KTables, and hence we enforce
users to provide store names and throw RTE if it is null;
    b) in some other cases, the KTable can be materialized or not; for
example in KStreamBuilder.table(), store names can be nullable and in which
case the KTable would not be materialized;
    c) in some other cases, the KTable will never be materialized, for
example KTable.filter() resulted KTables, and users have no options to
enforce them to be materialized;
    d) this is related to a), where some KTables are required to be
materialized, but we do not enforce users to provide a state store name,
e.g. KTables involved in joins; a RTE will be thrown not immediately but
later in this case.

2. The second issue is related to IQ, where state stores are accessed by
their state stores; so only those KTable's that have user-specified state
stores will be queryable. But because of 1) above, many stores may not be
interested to users for IQ but they still need to provide a (dummy?) state
store name for them; while on the other hand users cannot query some state
stores, e.g. the ones generated by KTable.filter() as there is no APIs for
them to specify a state store name.

3. We are aware from user feedbacks that such backend details would be
better be abstracted away from the DSL layer, where app developers should
just focus on processing logic, while state stores along with their
changelogs etc would better be in a different mechanism; same arguments
have been discussed for serdes / windowing triggers as well. For serdes
specifically, we had a very long discussion about it and concluded that, at
least in Java7, we cannot completely abstract serde away in the DSL, so we
choose the other extreme to enforce users to be completely aware of the
serde requirements when some KTables may need to be materialized vis
overloaded API functions. While for the state store names, I feel it is a
different argument than serdes (details below).


So to me, for either materialize() v.s. overloaded functions directions,
the first thing I'd like to resolve is the inconsistency issue mentioned
above. So in either case: KTable materialization will not be affect by user
providing state store name or not, but will only be decided by the library
when it is necessary. More specifically, only join operator and
builder.table() resulted KTables are not always materialized, but are still
likely to be materialized lazily (e.g. when participated in a join
operator).


For overloaded functions that would mean:

    a) we have an overloaded function for ALL operators that could result
in a KTable, and allow it to be null (i.e. for the function without this
param it is null by default);
    b) null-state-store-name do not indicate that a KTable would not be
materialized, but that it will not be used for IQ at all (internal state
store names will be generated when necessary).


For materialize() that would mean:

    a) we will remove state store names from ALL operators that could
result in a KTable.
    b) KTables that not calling materialized do not indicate that a KTable
would not be materialized, but that it will not be used for IQ at all
(internal state store names will be generated when necessary).


Again, in either ways the API itself does not "hint" about anything for
materializing a KTable or not at all; it is still purely determined by the
library when parsing the DSL for now.

Following these thoughts, I feel that 1) we should probably change the name
"materialize" since it may be misleading to users as what actually happened
behind the scene, to e.g. Damian suggested "queryableStore(String storeName)",
which returns a QueryableStateStore, and can replace the
`KafkaStreams.store` function; 2) comparing those two options assuming we
get rid of the misleading function name, I personally favor not adding more
overloading functions as it keeps the API simpler.



Guozhang


On Sat, Jan 28, 2017 at 2:32 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi,

thanks for your mail, felt like this can clarify some things! The thread
unfortunately split but as all branches close in on what my suggestion was
about Ill pick this to continue

Of course only the table the user wants to query would be materialized.
(retrieving the queryhandle implies materialisation). So In the example of
KTable::filter if you call
getIQHandle on both tables only the one source that is there would
materialize and the QueryHandleabstraction would make sure it gets mapped
and filtered and what not uppon read as usual.

Of Course the Object you would retrieve would maybe only wrap the
storeName / table unique identifier and a way to access the streams
instance and then basically uses the same mechanism that is currently used.
 From my point of view this is the least confusing way for DSL users. If
its to tricky to get a hand on the streams instance one could ask the user
to pass it in before executing queries, therefore making sure the streams
instance has been build.

The effort to implement this is indeed some orders of magnitude higher
than the overloaded materialized call. As long as I could help getting a
different view I am happy.

Best Jan


On 28.01.2017 09:36, Eno Thereska wrote:

Hi Jan,

I understand your concern. One implication of not passing any store name
and just getting an IQ handle is that all KTables would need to be
materialised. Currently the store name (or proposed .materialize() call)
act as hints on whether to materialise the KTable or not. Materialising
every KTable can be expensive, although there are some tricks one can play,
e.g., have a virtual store rather than one backed by a Kafka topic.

However, even with the above, after getting an IQ handle, the user would
still need to use IQ APIs to query the state. As such, we would still
continue to be outside the original DSL so this wouldn't address your
original concern.

So I read this suggestion as simplifying the APIs by removing the store
name, at the cost of having to materialise every KTable. It's definitely an
option we'll consider as part of this KIP.

Thanks
Eno


On 28 Jan 2017, at 06:49, Jan Filipiak <jan.filip...@trivago.com> wrote:
Hi Exactly

I know it works from the Processor API, but my suggestion would prevent
DSL users dealing with storenames what so ever.

In general I am pro switching between DSL and Processor API easily. (In
my Stream applications I do this a lot with reflection and instanciating
KTableImpl) Concerning this KIP all I say is that there should be a DSL
concept of "I want to expose this __KTable__. This can be a Method like
KTable::retrieveIQHandle():InteractiveQueryHandle, the table would know
to materialize, and the user had a reference to the "store and the
distributed query mechanism by the Interactive Query Handle" under the hood
it can use the same mechanism as the PIP people again.

I hope you see my point J

Best Jan


#DeathToIQMoreAndBetterConnectors :)




On 27.01.2017 21:59, Matthias J. Sax wrote:

Jan,

the IQ feature is not limited to Streams DSL but can also be used for
Stores used in PAPI. Thus, we need a mechanism that does work for PAPI
and DSL.

Nevertheless I see your point and I think we could provide a better API
for KTable stores including the discovery of remote shards of the same
KTable.

@Michael: Yes, right now we do have a lot of overloads and I am not a
big fan of those -- I would rather prefer a builder pattern. But that
might be a different discussion (nevertheless, if we would aim for a API
rework, we should get the changes with regard to stores right from the
beginning on, in order to avoid a redesign later on.)

something like:

stream.groupyByKey()
       .window(TimeWindow.of(5000))
       .aggregate(...)
       .withAggValueSerde(new CustomTypeSerde())
       .withStoreName("storeName);


(This would also reduce JavaDoc redundancy -- maybe a personal pain
point right now :))


-Matthias

On 1/27/17 11:10 AM, Jan Filipiak wrote:

Yeah,

Maybe my bad that I refuse to look into IQ as i don't find them
anywhere
close to being interesting. The Problem IMO is that people need to know
the Store name), so we are working on different levels to achieve a
single goal.

What is your peoples opinion on having a method on KTABLE that returns
them something like a Keyvalue store. There is of course problems like
"it cant be used before the streamthreads are going and groupmembership
is established..." but the benefit would be that for the user there is
a
consistent way of saying "Hey I need it materialized as querries gonna
be comming" + already get a Thing that he can execute the querries on
in
1 step.
What I think is unintuitive here is you need to say materialize on this
Ktable and then you go somewhere else and find its store name and then
you go to the kafkastreams instance and ask for the store with this
name.

So one could the user help to stay in DSL land and therefore maybe
confuse him less.

Best Jan

#DeathToIQMoreAndBetterConnectors :)



On 27.01.2017 16:51, Damian Guy wrote:

I think Jan is saying that they don't always need to be materialized,
i.e.,
filter just needs to apply the ValueGetter, it doesn't need yet
another
physical state store.

On Fri, 27 Jan 2017 at 15:49 Michael Noll <mich...@confluent.io>
wrote:

Like Damian, and for the same reasons, I am more in favor of
overloading
methods rather than introducing `materialize()`.
FWIW, we already have a similar API setup for e.g.
`KTable#through(topicName, stateStoreName)`.

A related but slightly different question is what e.g. Jan Filipiak
mentioned earlier in this thread:
I think we need to explain more clearly why KIP-114 doesn't propose
the
seemingly simpler solution of always materializing tables/state
stores.



On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi,
Yeah its confusing, Why shoudn't it be querable by IQ? If you uses
the
ValueGetter of Filter it will apply the filter and should be
completely
transparent as to if another processor or IQ is accessing it? How
can

this

new method help?

I cannot see the reason for the additional materialize method being
required! Hence I suggest leave it alone.
regarding removing the others I dont have strong opinions and it
seems to
be unrelated.

Best Jan




On 26.01.2017 20:48, Eno Thereska wrote:

Forwarding this thread to the users list too in case people would
like

to
comment. It is also on the dev list.
Thanks
Eno

Begin forwarded message:

From: "Matthias J. Sax" <matth...@confluent.io>
Subject: Re: [DISCUSS] KIP-114: KTable materialization and
improved
semantics
Date: 24 January 2017 at 19:30:10 GMT
To: d...@kafka.apache.org
Reply-To: d...@kafka.apache.org

That not what I meant by "huge impact".

I refer to the actions related to materialize a KTable: creating a
RocksDB store and a changelog topic -- users should be aware about
runtime implication and this is better expressed by an explicit
method
call, rather than implicitly triggered by using a different
overload of
a method.


-Matthias

On 1/24/17 1:35 AM, Damian Guy wrote:

I think your definition of a huge impact and mine are rather
different
;-P
Overloading a few methods  is not really a huge impact IMO. It is

also a
sacrifice worth making for readability, usability of the API.
On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <
matth...@confluent.io>
wrote:

I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more

explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more
verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:

I'm not a fan of materialize. I think it interrupts the flow,
i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where
people can
specify the store names if they want, otherwise we just
generate

them.
On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax
<matth...@confluent.io

wrote:

Hi,
thanks for the KIP Eno! Here are my 2 cents:

1) I like Guozhang's proposal about removing store name from
all
KTable
methods and generate internal names (however, I would do this
as
overloads). Furthermore, I would not force users to call
.materialize()
if they want to query a store, but add one more method
.stateStoreName()
that returns the store name if the KTable is materialized.
Thus,

also
.materialize() must not necessarily have a parameter storeName
(ie,
we
should have some overloads here).

I would also not allow to provide a null store name (to
indicate no
materialization if not necessary) but throw an exception.

This yields some simplification (see below).


2) I also like Guozhang's proposal about KStream#toTable()


3)

    3. What will happen when you call materialize on KTable
that is
already
    materialized? Will it create another StateStore
(providing
the

name

is
    different), throw an Exception?
Currently an exception is thrown, but see below.
If we follow approach (1) from Guozhang, there is no need to
worry

about
a second materialization and also no exception must be
throws. A
call to
.materialize() basically sets a "materialized flag" (ie,
idempotent
operation) and sets a new name.


4)

Rename toStream() to toKStream() for consistency.
Not sure whether that is really required. We also use
`KStreamBuilder#stream()` and `KStreamBuilder#table()`, for

example,
and
don't care about the "K" prefix.

Eno's reply:

I think changing it to `toKStream` would make it absolutely
clear
what

we are converting it to.
I'd say we should probably change the KStreamBuilder methods
(but
not

in
this KIP).

I would keep #toStream(). (see below)


5) We should not remove any methods but only deprecate them.



A general note:

I do not understand your comments "Rejected Alternatives". You
say
"Have
the KTable be the materialized view" was rejected. But your
KIP
actually
does exactly this -- the changelog abstraction of KTable is

secondary
after those changes and the "view" abstraction is what a
KTable is.
And
just to be clear, I like this a lot:

- it aligns with the name KTable
- is aligns with stream-table-duality
- it aligns with IQ

I would say that a KTable is a "view abstraction" (as
materialization is
optional).



-Matthias




On 1/22/17 5:05 PM, Guozhang Wang wrote:

Thanks for the KIP Eno, I have a few meta comments and a few
detailed
comments:

1. I like the materialize() function in general, but I would
like

to
see
how other KTable functions should be updated accordingly. For

example,
1)
KStreamBuilder.table(..) has a state store name parameter, and
we

will
always materialize the KTable unless its state store name is set
to
null;
2) KTable.agg requires the result KTable to be materialized,
and

hence
it
also have a state store name; 3) KTable.join requires the
joining

table
to
be materialized. And today we do not actually have a
mechanism to

enforce
that, but will only throw an exception at runtime if it is not
(e.g.

if
you
have "builder.table("topic", null).join()" a RTE will be
thrown).

I'd make an extended proposal just to kick off the discussion

here:
let's
remove all the state store params in other KTable functions,
and if

in
some
cases KTable have to be materialized (e.g. KTable resulted
from

KXX.agg)
and users do not call materialize(), then we treat it as "users
are

not
interested in querying it at all" and hence use an internal
name

generated
for the materialized KTable; i.e. although it is materialized
the
state
store is not exposed to users. And if users call
materialize()

afterwards
but we have already decided to materialize it, we can replace
the

internal

name with the user's provided names. Then from a user's
point-view,
if
they
ever want to query a KTable, they have to call materialize()
with

a
given
state store name. This approach has one awkwardness though,
that

serdes
and
state store names param are not separated and could be
overlapped
(see
detailed comment #2 below).


2. This step does not need to be included in this KIP, but
just

as a
reference / future work: as we have discussed before, we may
enforce
materialize KTable.join resulted KTables as well in the
future. If
we

do
that, then:

a) KXX.agg resulted KTables are always materialized;
b) KTable.agg requires the aggregating KTable to always be
materialized
(otherwise we would not know the old value);
c) KTable.join resulted KTables are always materialized, and
so

are
the
joining KTables to always be materialized.
d) KTable.filter/mapValues resulted KTables materialization
depend
on

its
parent's materialization;

By recursive induction all KTables are actually always
materialized,
and
then the effect of the "materialize()" is just for specifying
the

state
store names. In this scenario, we do not need to send
Change<V> in
repartition topics within joins any more, but only for

repartitions
topics
within aggregations. Instead, we can just send a "tombstone"
without
the
old value and we do not need to calculate joins twice (one more
time

when

old value is received).

3. I'm wondering if it is worth-while to add a
"KStream#toTable()"

function
which is interpreted as a dummy-aggregation where the new
value
always
replaces the old value. I have seen a couple of use cases of
this,
for
example, users want to read a changelog topic, apply some
filters,
and

then
materialize it into a KTable with state stores without
creating

duplicated
changelog topics. With materialize() and toTable I'd imagine
users
can
specify sth. like:

"
KStream stream = builder.stream("topic1").filter(..);
KTable table = stream.toTable(..);
table.materialize("state1");
"

And the library in this case could set store "state1" 's
changelog

topic
to

be "topic1", and applying the filter on the fly while
(re-)storing
its
state by reading from this topic, instead of creating a
second

changelog
topic like "appID-state1-changelog" which is a semi-duplicate
of

"topic1".

Detailed:
1. I'm +1 with Michael regarding "#toStream"; actually I was
thinking

about
renaming to "#toChangeLog" but after thinking a bit more I
think

#toStream
is still better, and we can just mention in the javaDoc that
it is
transforming its underlying changelog stream to a normal
stream.
2. As Damian mentioned, there are a few scenarios where the
serdes
are
already specified in a previous operation whereas it is not
known

before
calling materialize, for example:

stream.groupByKey.agg(serde).materialize(serde) v.s.
table.mapValues(/*no
serde specified*/).materialize(serde). We need to specify
what are

the
handling logic here.
3. We can remove "KTable#to" call as well, and enforce users
to
call "
KTable.toStream.to" to be more clear.


Guozhang


On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
eno.there...@gmail.com>
wrote:

I think changing it to `toKStream` would make it absolutely
clear

what

we
are converting it to.

I'd say we should probably change the KStreamBuilder methods
(but
not

in
this KIP).
Thanks
Eno

On 17 Jan 2017, at 13:59, Michael Noll <
mich...@confluent.io>

wrote:

Rename toStream() to toKStream() for consistency.
Not sure whether that is really required. We also use
`KStreamBuilder#stream()` and `KStreamBuilder#table()`, for
example,

and
don't care about the "K" prefix.

On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <
eno.there...@gmail.com
wrote:
Thanks Damian, answers inline:
On 16 Jan 2017, at 17:17, Damian Guy <
damian....@gmail.com>

wrote:

Hi Eno,

Thanks for the KIP. Some comments:

1. I'd probably rename materialized to materialize.

Ok.
2. I don't think the addition of the new Log compaction

mechanism
is
necessary for this KIP, i.e, the KIP is useful without it.
Maybe

that
should be a different KIP?
Agreed, already removed. Will do a separate KIP for that.
3. What will happen when you call materialize on KTable
that is
already

materialized? Will it create another StateStore (providing
the

name

is
different), throw an Exception?
Currently an exception is thrown, but see below.
4. Have you considered overloading the existing KTable

operations
to
add
a state store name? So if a state store name is provided,
then

materialize
a state store? This would be my preferred approach as i
don't

think
materialize is always a valid operation.
Ok I can see your point. This will increase the KIP size
since
I'll

need
to enumerate all overloaded methods, but it's not a problem.
5. The materialize method will need ta value Serde as some
operations,

i.e., mapValues, join etc can change the value types
6. https://issues.apache.org/jira/browse/KAFKA-4609 - might
mean

that
we
always need to materialize the StateStore for KTable-KTable
joins.

If
that
is the case, then the KTable Join operators will also need
Serde
information.
I'll update the KIP with the serdes.
Thanks
Eno


Cheers,

Damian


On Mon, 16 Jan 2017 at 16:44 Eno Thereska <
eno.there...@gmail.com>

wrote:
Hello,
We created "KIP-114: KTable materialization and improved

semantics"
to
solidify the KTable semantics in Kafka Streams:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
114%3A+KTable+materialization+and+improved+semantics
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-

114:+KTable+materialization+and+improved+semantics
Your feedback is appreciated.

Thanks
Eno


--
-- Guozhang






Reply via email to