Thank you Bill,

I think this is reasonable. Do you have any suggestion
for handling oldValues in cases like

builder.table().filter(RichPredicate).join()

where we process a Change with old and new value and dont have a record context for old.

my suggestion would be that instead of

SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK

we build

SOURCE  -> KTABLEFILTER ->  KTABLESOURCE -> JOIN -> SINK

We should build a topology like this from the beginning and not have
an optimisation phase afterwards.

Any opinions?

Best Jan




On 05.12.2017 17:34, Bill Bejeck wrote:
Matthias,

Overall I agree with what you've presented here.

Initially, I was hesitant to remove information from the context of the
result records (Joins or Aggregations) with the thought that when there are
unexpected results, the source information would be useful for tracing back
where the error could have occurred.  But in the case of Joins and
Aggregations, the amount of data needed to do meaningful analysis could be
too much. For example, a join result could come from two topics so you'd
need to keep both original topic names, offsets, etc. (plus the broker
could have deleted the records in the interim so even having offset could
provide nothing).

I'm bit long winded here, but I've come full circle to your original
proposal that since Joins and Aggregations produce fundamentally new types,
we drop the corresponding information from the context even in the case of
single topic aggregations.

Thanks,
Bill

On Mon, Dec 4, 2017 at 7:02 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

I agree with Guozhang that just exposing meta data at the source level
might not provide too much value. Furthermore, for timestamps we do
already have a well defined contract and we should exploit it:
timestamps can always be provided in a meaningful way.

Also, for simple operations like KStream-filter/map the contract is
simple and we can just use it. Same for KTable-filter/map (for new values).

For aggregations, join, and oldValue, I could just drop some information
and return `null`/-1, if the result records has no semantically
meaningful meta data.

For example, for aggregations, we could preserve the partition (as all
agg-input-records have the same partition). For single input topic
aggregation (what I guess is the most prominent case), we can also carry
over the topic name (would be a internal repartitioning topic name
often). Offsets don't have any semantic interpretation IMHO and we could
return -1.

For joins, we could keep the partition information. Topic and offset are
both unknown/invalid for the output record IMHO.

For the oldValue case, we can keep partition and for single input topic
case topic name. Timestamp might be -1 for now, but after we added
timestamps to KTable (what we plan to do anyway), we can also return a
valid timestamp. Offset would be -1 again (if we store offset in KTable
too, we could provide all offset as well -- but I don't see too much
value in doing this compared to the storage overhead this implies).


WDYT?


-Matthias

On 11/29/17 4:14 AM, Jan Filipiak wrote:
Hi,

thank you for the summary and thanks for acknowledging that I do have a
point here.

I don't like the second Idea at all. Hence I started of this discussion.

I am just disappointed, back then when we had the discussion about how
to refactor store overload
and IQ handling, I knew the path we are taking is wrong. Having problems
implementing these kinda
features (wich are really simple)  is just a symptom of messed up IQ
implementation. I wish really bad
I could have convinced you guys back then. To be honest with IQ we can
continue here
as we Materialize but would not send oldValue, but with join you're out
of luck with current setup.

I of course recommend to do not introduce any optimizations here. Id
recommend to go towards what
I recommended already back then. So i would't say we need to optimize
anything later we need to build
the topology better in the first place.




On 28.11.2017 21:00, Guozhang Wang wrote:
Jan,

Thanks for your input, I can understand now that the oldValue is also
exposed in user customized `filter` function and hence want record
context
we should expose is a problem. And I think it does brings a good point
to
consider for KIP-159. The discussions maybe a bit confusing to reader
though, and hence I'd like to summarize the status quo and with a
proposal:

In today's Streams DSL, when a KTable is created either from a source
topic, or from an stateful operator, we will materialize the KTable
with a
backing state store; on the other hand, KTables created from a
non-stateful
operator like filter, will not be backed by a state store by default
unless
users indicate so (e.g. using the overloaded function with the queryable
name or store supplier).

For example:

KTable table1 = builder.table("topic");
// a
state store created for table1
KTable table2 = table1.filter(..);
// no state store created for table2
KTable table3 = table1.filter(.., "storeName");                  // a
state
store created for table3
KTable table4 = table1.groupBy(..).aggregate(..);            // a state
store created for table4

Because of that, the filter() operator above on table1 will always be
exposed with oldValue and newValue; Damian's point is that, we may
optimize
the first case such that table1 will only be materialized if users
asked so
(e.g. using the overloaded function with a store supplier), and in which
case, we do not need to pass newValue / oldValue pairs (I think this is
what Jan suggests as well, i.e. do filtering before materializing, so
that
we can have a smaller backed state store as well). But this optimization
does not eliminate the possibilities that we may still need to do
filter if
users does specify "yes I do want to the source KTable itself to be
materialized, please". So the concern about how to expose the record
context in such cases still persists.


With that, regarding to KIP-159 itself, here are my thoughts:

1) if we restrict the scope of exposing record context only to source
KTables / KStreams I felt the KIP itself does not bring much value given
its required API change because only the SourceKStream can safely
maintain
its records context, and for SourceKTable if it is materialized, then
even
non-stateful operators like Join may still have a concern about exposing
the record context.

2) an alternative idea is we provide the semantics on how record context
would be inherited across the operators for KTable / KStream and
expose it
in all operators (similarly in PAPI we would expose a much simpler
contract), and make it as a public contract that Streams library will
guarantee moving forward even we optimize our topology builder; it may
not
align perfectly with the linear algebraic semantics but practically
applicable for most cases; if users semantics do not fit in the provided
contract, then they may need to get this themselves (embed such
information
in the value payload, for example).

If people do not like the second idea, I'd suggest we hold on pursuing
the
first direction since to me its beneficial scope is too limited
compared to
its cost.


Guozhang



On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak <jan.filip...@trivago.com
wrote:

Cleary we show the oldValue to the user. We have to, because we filter
after the store.
https://github.com/axbaretto/kafka/blob/master/streams/src/m
ain/java/org/apache/kafka/streams/kstream/internals/
KTableFilter.java#L96

I cannot help you following this. It is really obvious and I am running
out of tools for explaining.

Thanks for understanding my point to put filter before. Not only
would it
make the store smaller. It would make this feature reasonably
possible and
the framework easier. Interestingly it would also help to move IQ
into more
reasonable directions. And it might help understand that we do not
need any
intermediate representation of the topology,

KIP-182 I have no clue what everyone has with their "bytestores" so
broken. But putting another store after doesn't help when the store
before
is the problem.




On 24.11.2017 05:08, Matthias J. Sax wrote:

   From a DSL point of view, users only see the new value on a
KTable#filter anyway. So why should it be an issue that we use
<newValue,oldValue> pair under the hood?

User sees newValue and gets corresponding RecordContext. I can't see
any
issue here?

I cannot follow here:

Even when we have a statefull operation last. We move it to the very
first processor (KtableSource)
and therefore cant present a proper RecordContext.

With regard to `builder.table().filter()`:

I see you point that it would be good to be able to apply the filter()
first to reduce the stat store size of the table. But how is this
related to KIP-159?

Btw: with KIP-182, I am wondering if this would not be possible, by
putting a custom dummy store into the table and materialize the filter
result afterwards? It's not a nice way to do, but seems to be
possible.

-Matthias

On 11/23/17 4:56 AM, Jan Filipiak wrote:

The comment is valid. It falls exactly into this topic, it has
exactly
todo with this!
Even when we have a statefull operation last. We move it to the very
first processor (KtableSource)
and therefore cant present a proper RecordContext.

Regarding the other Jiras you are referring to. They harm the project
more than they do good!
There is no need for this kind of optimizer and meta representation
and
what not. I hope they
never get implemented.

Best Jan


On 22.11.2017 14:44, Damian Guy wrote:

Jan, i think you comment with respect to filtering is valid, though
not for
this KIP. We have separate JIRAs for topology optimization of
which this
falls into.

Thanks,
Damian

On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com>
wrote:

Jan,
Not sure I understand your argument that "we still going to present
change.oldValue to the filter even though the record context() is
for
change.newValue". Are you referring to `KTableFilter#process()`?
If yes
could you point to me which LOC are you concerning about?


Guozhang


On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

a remark of mine that got missed during migration:
There is this problem that even though we have
source.table.filter.join
the state-fullness happens at the table step not a the join
step. In a
filter
we still going to present change.oldValue to the filter even
though
the
record context() is for change.newValue. I would go as far as
applying
the filter before the table processor. Not to just get KIP-159,
but
because

I think its a side effect of a non ideal topology layout. If i can
filter
99% of my
records. my state could be way smaller. Also widely escalates the
context
of the KIP

I can only see upsides of executing the filter first.

Best Jan



On 20.11.2017 22:22, Matthias J. Sax wrote:

I am moving this back to the DISCUSS thread... Last 10 emails were
sent
to VOTE thread.

Copying Guozhang's last summary below. Thanks for this summary.
Very
comprehensive!

It seems, we all agree, that the current implementation of the
context
at PAPI level is ok, but we should not leak it into DSL.

Thus, we can go with (2) or (3), were (3) is an extension to (2)
carrying the context to more operators than just sources. It also
seems,
that we all agree, that many-to-one operations void the context.

I still think, that just going with plain (2) is too
restrictive --
but
I am also fine if we don't go with the full proposal of (3).

Also note, that the two operators filter() and filterNot() don't
modify
the record and thus for both, it would be absolutely valid to
keep
the
context.

I personally would keep the context for at least all one-to-one
operators. One-to-many is debatable and I am fine to not carry
the
context further: at least the offset information is
questionable for
this case -- note thought, that semantically, the timestamp is
inherited
via one-to-many, and I also think this applies to "topic" and
"partition". Thus, I think it's still valuable information we can
carry
downstreams.


-Matthias

Jan: which approach are you referring to as "the approach that is
on the

table would be perfect"?

Note that in today's PAPI layer we are already effectively
exposing the
record context which has the issues that we have been discussing
right
now,
and its semantics is always referring to the "processing
record" at

hand.
More specifically, we can think of processing a record a bit
different:

1) the record traversed the topology from source to sink, it
may be
transformed into new object or even generate multiple new
objects
(think:
branch) along the traversal. And the record context is referring
to
this
processing record. Here the "lifetime" of the record lasts for the
entire
topology traversal and any new records of this traversal is
treated as
different transformed values of this record (this applies to
join
and
aggregations as well).

2) the record being processed is wiped out in the first operator
after
the
source, and NEW records are forwarded to downstream operators.
I.e.

each
record only lives between two adjacent operators, once it
reached the
new
operator it's lifetime has ended and new records are generated.
I think in the past we have talked about Streams under both
context,

and
we
do not have a clear agreement. I agree that 2) is logically more
understandable for users as it does not leak any internal

implementation
details (e.g. for stream-table joins, table record's traversal
ends at
the
join operator as it is only be materialized, while stream
record's
traversal goes through the join operator to further down until
sinks).
However if we are going to interpret following 2) above then
even
for
non-stateful operators we would not inherit record context. What
we're
discussing now, seems to infer a third semantics:

3) a record would traverse "through" one-to-one (non-stateful)

operators,
will "replicate" at one-to-many (non-stateful) operators (think:
"mapValues"
      ) and will "end" at many-to-one (stateful) operators
where NEW

records
will be generated and forwarded to the downstream operators.
Just wanted to lay the ground for discussions so we are all on
the
same
page before chatting more.


Guozhang

On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
Hi Matthias,
Thanks a lot for correcting. It is a leftover from the past
designs

when
punctuate() was not deprecated.
I corrected.

Cheers,
Jeyhun

On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax
<matth...@confluent.io>
wrote:

I just re-read the KIP.

One minor comment: we don't need to introduce any deprecated
methods.
Thus, RichValueTransformer#punctuate can be removed completely
instead
of introducing it as deprecated.

Otherwise looks good to me.

Thanks for being so patient!


-Matthias

On 11/1/17 9:16 PM, Guozhang Wang wrote:

Jeyhun,
I think I'm convinced to not do KAFKA-3907 in this KIP. We
should

think
carefully if we should add this functionality to the DSL layer
moving
forward since from what we discovered working on it the
conclusion is

that
it would require revamping the public APIs quite a lot, and
it's
not

clear
if it is a good trade-off than asking users to call process()
instead.
Guozhang
On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy
<damian....@gmail.com>
wrote:

Hi Jeyhun, thanks, looks good.

Do we need to remove the line that says:

        - on-demand commit() feature

Cheers,
Damian

On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <
je.kari...@gmail.com>

wrote:
Hi,

I removed the 'commit()' feature, as we discussed. It
simplified
the
overall design of KIP a lot.
If it is ok, I would like to start a VOTE thread.

Cheers,
Jeyhun

On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
matth...@confluent.io
wrote:

Thanks. I understand what you are saying, but I don't
agree that

but also we need a commit() method
I would just not provide `commit()` at DSL level and
close the
corresponding Jira as "not a problem" or similar.


-Matthias

On 10/27/17 3:42 PM, Jeyhun Karimov wrote:

Hi Matthias,
Thanks for your comments. I agree that this is not the
best
way

to
do.
A

bit of history behind this design.
Prior doing this, I tried to provide ProcessorContext
itself
as

an
argument
in Rich interfaces. However, we dont want to give users
that
flexibility
and “power”. Moreover, ProcessorContext contains processor
level

information and not Record level info. The only thing we
need ij
ProcessorContext is commit() method.

So, as far as I understood, we need recor context (offset,
timestamp

and
etc) but also we need a commit() method ( we dont want to
provide

ProcessorContext as a parameter so users can use

ProcessorContext.commit()
).
As a result, I thought to “propagate” commit() call from

RecordContext
to

ProcessorContext() .
If there is a misunderstanding in motvation/discussion of

KIP/included
jiras please let me know.

Cheers,
Jeyhun


On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
matth...@confluent.io

wrote:
I am personally still not convinced, that we should add
`commit()`
at
all.
@Guozhang: you created the original Jira. Can you
elaborate a
little
bit? Isn't requesting commits a low level API that should
not be

exposed
in the DSL? Just want to understand the motivation
better. Why

would
anybody that uses the DSL ever want to request a commit? To
me,
requesting commits is useful if you manipulated state

explicitly,
ie,
via Processor API.
Also, for the solution: it seem rather unnatural to me,
that we
add
`commit()` to `RecordContext` -- from my understanding,

`RecordContext`
is an helper object that provide access to record meta
data.
Requesting

a commit is something quite different. Additionally, a
commit
does

not

commit a specific record but a `RecrodContext` is for a
specific

record.
To me, this does not seem to be a sound API design if we
follow

this
path.
-Matthias



On 10/26/17 10:41 PM, Jeyhun Karimov wrote:

Hi,
Thanks for your suggestions.

I have some comments, to make sure that there is no

misunderstanding.
1. Maybe we can deprecate the `commit()` in
ProcessorContext,

to
enforce
user to consolidate this call as

"processorContext.recordContext().commit()". And
internal
implementation
of
`ProcessorContext.commit()` in `ProcessorContextImpl` is
also

changed
to
this call.
- I think we should not deprecate
`ProcessorContext.commit()`.
The

main
intuition that we introduce `commit()` in
`RecordContext` is
that,

`RecordContext` is the one which is provided in Rich
interfaces.
So
if
user
wants to commit, then there should be some method inside
`RecordContext`
to

do so. Internally, `RecordContext.commit()` calls
`ProcessorContext.commit()`  (see the last code
snippet in

KIP-159):
@Override
         public void process(final K1 key, final V1 value) {
             recordContext = new RecordContext()
{               //
recordContext initialization is added in this KIP
                 @Override
                 public void commit() {
                     context().commit();
                 }

                 @Override
                 public long offset() {
                     return
context().recordContext().offs
et();
                 }

                 @Override
                 public long timestamp() {
                     return
context().recordContext().timestamp();
                 }

                 @Override
                 public String topic() {
                     return
context().recordContext().topi
c();
                 }

                 @Override
                 public int partition() {
                     return
context().recordContext().partition();
                 }
           };


So, we cannot deprecate `ProcessorContext.commit()` in
this

case
IMO.
2. Add the `task` reference to the impl class,

`ProcessorRecordContext`,
so

that it can implement the commit call itself.
- Actually, I don't think that we need `commit()` in
`ProcessorRecordContext`. The main intuition is to
"transfer"
`ProcessorContext.commit()` call to Rich interfaces, to
support
user-specific committing.
      To do so, we introduce `commit()` method in
`RecordContext()`
just

only
to

call ProcessorContext.commit() inside. (see the above
code
snippet)
So, in Rich interfaces, we are not dealing with

`ProcessorRecordContext`
at all, and we leave all its methods as it is.

In this KIP, we made `RecordContext` to be the parent
class of
`ProcessorRecordContext`, just because of they share
quite

amount
of
methods and it is logical to enable inheritance between
those
two.

3. In the wiki page, the statement that "However, call to a
commit()
method,
is valid only within RecordContext interface (at least for
now),
we
throw
an exception in ProcessorRecordContext.commit()." and the
code
snippet

below would need to be updated as well.
- I think above explanation covers this as well.
I want to gain some speed to this KIP, as it has gone
though

many
changes
based on user/developer needs, both in

documentation-/implementation-wise.

Cheers,
Jeyhun



On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
wangg...@gmail.com>

wrote:
Thanks for the information Jeyhun. I had also forgot
about
KAFKA-3907

with
this KIP..
Thinking a bit more, I'm now inclined to go with what
we
agreed
before,
to
add the commit() call to `RecordContext`. A few minor
tweaks on
its

implementation:
1. Maybe we can deprecate the `commit()` in
ProcessorContext,

to
enforce
user to consolidate this call as
"processorContext.recordContext().commit()". And
internal
implementation
of
`ProcessorContext.commit()` in `ProcessorContextImpl` is
also

changed
to
this call.
2. Add the `task` reference to the impl class,
`ProcessorRecordContext`, so
that it can implement the commit call itself.

3. In the wiki page, the statement that "However,
call to a

commit()
method,
is valid only within RecordContext interface (at least for
now),
we
throw
an exception in ProcessorRecordContext.commit()." and the
code
snippet

below would need to be updated as well.
Guozhang

On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <

matth...@confluent.io
wrote:
Fair point. This is a long discussion and I totally
forgot
that
we
discussed this.
Seems I changed my opinion about including KAFKA-3907...
Happy to hear what others think.


-Matthias

On 10/23/17 1:20 PM, Jeyhun Karimov wrote:

Hi Matthias,
It is probably my bad, the discussion was a bit
long in
this

thread. I
proposed the related issue in the related KIP discuss
thread
[1]

and
got
an
approval [2,3].
Maybe I misunderstood.

[1]
http://search-hadoop.com/m/Kaf
ka/uyzND19Asmg1GKKXT1?subj=

Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
Streams
[2]
http://search-hadoop.com/m/Kaf
ka/uyzND1kpct22GKKXT1?subj=

Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
Streams
[3]
http://search-hadoop.com/m/
Kafka/uyzND1G6TGIGKKXT1?subj=

Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
Streams
On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <
matth...@confluent.io
wrote:
Interesting.
I thought that https://issues.apache.org/

jira/browse/KAFKA-4125
is
the
main motivation for this KIP :)
I also think, that we should not expose the full
ProcessorContext
at
DSL
level.
Thus, overall I am not even sure if we should fix
KAFKA-3907
at
all.
Manual commits are something DSL users should not
worry
about

--
and
if
one really needs this, an advanced user can still insert
a
dummy

`transform` to request a commit from there.
-Matthias
On 10/18/17 5:39 AM, Jeyhun Karimov wrote:

Hi,
The main intuition is to solve [1], which is part
of
this
KIP.
I agree with you that this might not seem
semantically
correct

as
we
are
not committing record state.
Alternatively, we can remove commit() from
RecordContext
and
add
ProcessorContext (which has commit() method) as an
extra
argument
to
Rich
methods:
instead of
public interface RichValueMapper<V, VR, K> {
         VR apply(final V value,
                  final K key,
                  final RecordContext
recordContext);
}

we can adopt

public interface RichValueMapper<V, VR, K> {
         VR apply(final V value,
                  final K key,
                  final RecordContext recordContext,
                  final ProcessorContext
processorContext);
}


However, in this case, a user can get confused as

ProcessorContext
and
RecordContext share some methods with the same name.
Cheers,
Jeyhun


[1] https://issues.apache.org/
jira/browse/KAFKA-3907

On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <

wangg...@gmail.com
wrote:
Regarding #6 above, I'm still not clear why we would
need
`commit()`

in
both ProcessorContext and RecordContext, could you
elaborate

a
bit
more?
To me `commit()` is really a processor context not a
record

context
logically: when you call that function, it means we
would

commit
the
state
of the whole task up to this processed record, not only
that
single
record
itself.
Guozhang

On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <

je.kari...@gmail.com
wrote:
Hi,
Thanks for the feedback.


0. RichInitializer definition seems missing.



- Fixed.


      I'd suggest moving the key parameter in the

RichValueXX
and
RichReducer
after the value parameters, as well as in the
templates;

e.g.
public interface RichValueJoiner<V1, V2, VR, K> {
         VR apply(final V1 value1, final V2 value2,
final K
key,

final
RecordContext
recordContext);
}

- Fixed.
2. Some of the listed functions are not necessary
since

their
pairing
APIs
are being deprecated in 1.0 already:
<KR> KGroupedStream<KR, V> groupBy(final

RichKeyValueMapper<?
super
K,
?
super V, KR> selector,
                                        final
Serde<KR>

keySerde,
                                        final
Serde<V>
valSerde);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K,
VT>
table,

                                      final
RichValueJoiner<?
super
K,
?
super
V,
? super VT, ? extends VR> joiner,
                                      final
Serde<K>
keySerde,
                                      final
Serde<V>
valSerde);

-Fixed
3. For a few functions where we are adding
three APIs

for
a




Reply via email to