Finished my deeper scan on your approach.
Most of the comments I put at the PR are minor code style things.
One forward call seems to be a bug though, would be great if you could double check.

the one problem I see is that the high watermark store grows unbounded.
A key being deleted from the source table does not lead to deletion in the watermark store.

I also don't quite grasp the concept why it's needed. I think the whole offset part can go away? It seems to deal with node failures of some kind but everything should turn out okay without it?

Best Jan


On 01.09.2018 20:44, Guozhang Wang wrote:
Yes Adam, that makes sense.

I think it may be better to have a working PR to review before we complete
the VOTE thread. In my previous experience a large feature like this are
mostly definitely going to miss some devils in the details in the design
and wiki discussion phases.

That would unfortunately mean that your implementations may need to be
modified / updated along with the review and further KIP discussion. I can
understand this can be painful, but that may be the best option we can do
to avoid as much work to be wasted as possible.


Guozhang


On Wed, Aug 29, 2018 at 10:06 AM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

Hi Guozhang

By workflow I mean just the overall process of how the KIP is implemented.
Any ideas on the ways to reduce the topic count, materializations, if there
is a better way to resolve out-of-order than a highwater mark table, if the
design philosophy of “keep everything encapsulated within the join
function” is appropriate, etc. I can implement the changes that John
suggested, but if my overall workflow is not acceptable I would rather
address that before making minor changes.

If this requires a full candidate PR ready to go to prod then I can make
those changes. Hope that clears things up.

Thanks

Adam

On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:

Hi Adam,

What do you mean by "additional comments on the workflow.", do you mean
to
let other review your PR https://github.com/apache/kafka/pull/5527 ? Is
is
ready for reviews?


Guozhang

On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <
adam.bellem...@gmail.com>
wrote:

Okay, I will implement John's suggestion of namespacing the external
headers prior to processing, and then removing the namespacing prior to
emitting. A potential future KIP could be to provide this namespacing
automatically.

I would also appreciate any other additional comments on the workflow.
My
goal is suss out agreement prior to moving to a vote.

On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com>
wrote:
I like John's idea as well: for this KIP specifically as we do not
expect
any other consumers to read the repartition topics externally, we can
slightly prefix the header to be safe, while keeping the additional
cost
(note the header field is per-record, so any additional byte is
per-record
as well) low.


Guozhang

On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare <
adam.bellem...@gmail.com
wrote:

Hi John

That is an excellent idea. The header usage I propose would be limited
entirely to internal topics, and this could very well be the solution
to
potential conflicts. If we do not officially reserve a prefix "__"
then I
think this would be the safest idea, as it would entirely avoid any
accidents (perhaps if a company is using its own "__" prefix for other
reasons).

Thanks

Adam


On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io>
wrote:
Just a quick thought regarding headers:
I think there is no absolute-safe ways to avoid conflicts, but we
can
still
consider using some name patterns to reduce the likelihood as much
as
possible.. e.g. consider sth. like the internal topics naming: e.g.
"__internal_[name]"?
I think there is a safe way to avoid conflicts, since these headers
are
only needed in internal topics (I think):
For internal and changelog topics, we can namespace all headers:
* user-defined headers are namespaced as "external." + headerKey
* internal headers are namespaced as "internal." + headerKey

This is a lot of characters, so we could use a sigil instead (e.g.,
"_"
for
internal, "~" for external)

We simply apply the namespacing when we read user headers from
external
topics into the topology and then de-namespace them before we emit
them
to
an external topic (via "to" or "through").
Now, it is not possible to collide with user-defined headers.

That said, I'd also be fine with just reserving "__" as a header
prefix
and
not worrying about collisions.

Thanks for the KIP,
-John

On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak <
jan.filip...@trivago.com
wrote:

Still havent completly grabbed it.
sorry will read more

On 17.08.2018 21:23, Jan Filipiak wrote:
Cool stuff.

I made some random remarks. Did not touch the core of the
algorithm
yet.
Will do Monday 100%

I don't see Interactive Queries :) like that!




On 17.08.2018 20:28, Adam Bellemare wrote:
I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for
discussion?
On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

even before message headers, the option for me always existed
to
just wrap
the messages into my own custom envelop.
So I of course thought this through. One sentence in your last
email
triggered all the thought process I put in the back then
again to design it in the, what i think is the "kafka-way". It
ended
up
ranting a little about what happened in the past.

I see plenty of colleagues of mine falling into traps in the
API,
that I
did warn about in the 1.0 DSL rewrite. I have the same
feeling again. So I hope it gives you some insights into my
though
process. I am aware that since i never ported 213 to higher
streams version, I don't really have a steak here and
initially I
didn't
feel like actually sending it. But maybe you can pull
something good from it.

  Best jan



On 15.08.2018 04:44, Adam Bellemare wrote:

@Jan
Thanks Jan. I take it you mean "key-widening" somehow includes
information
about which record is processed first? I understand about a
CombinedKey
with both the Foreign and Primary key, but I don't see how you
track
ordering metadata in there unless you actually included a
metadata
field
in
the key type as well.

@Guozhang
As Jan mentioned earlier, is Record Headers mean to strictly
be
used in
just the user-space? It seems that it is possible that a
collision
on the
(key,value) tuple I wish to add to it could occur. For
instance,
if
I
wanted to add a ("foreignKeyOffset",10) to the Headers but the
user
already
specified their own header with the same key name, then it
appears
there
would be a collision. (This is one of the issues I brought up
in
the KIP).

--------------------------------

I will be posting a prototype PR against trunk within the next
day
or two.
One thing I need to point out is that my design very strictly
wraps
the
entire foreignKeyJoin process entirely within the DSL
function.
There is
no
exposure of CombinedKeys or widened keys, nothing to resolve
with
regards
to out-of-order processing and no need for the DSL user to
even
know
what's
going on inside of the function. The code simply returns the
results of
the
join, keyed by the original key. Currently my API mirrors
identically the
format of the data returned by the regular join function, and
I
believe
that this is very useful to many users of the DSL. It is my
understanding
that one of the main design goals of the DSL is to provide
higher
level
functionality without requiring the users to know exactly
what's
going on
under the hood. With this in mind, I thought it best to solve
ordering and
partitioning problems within the function and eliminate the
requirement
for
users to do additional work after the fact to resolve the
results
of their
join. Basically, I am assuming that most users of the DSL just
"want it to
work" and want it to be easy. I did this operating under the
assumption
that if a user truly wants to optimize their own workflow down
to
the
finest details then they will break from strictly using the
DSL
and
move
down to the processors API.

I think. The abstraction is not powerful enough
to not have kafka specifics leak up The leak I currently think
this
has is
that you can not reliable prevent the delete coming out first,
before you emit the correct new record. As it is an abstraction
entirely
around kafka.
I can only recommend to not to. Honesty and simplicity should
always
be
first prio
trying to hide this just makes it more complex, less
understandable
and
will lead to mistakes
in usage.

Exactly why I am also in big disfavour of GraphNodes and later
optimization stages.
Can someone give me an example of an optimisation that really
can't
be
handled by the user
constructing his topology differently?
Having reusable Processor API components accessible by the DSL
and
composable as
one likes is exactly where DSL should max out and KSQL should
do
the
next
step.
I find it very unprofessional from a software engineering
approach
to run
software where
you can not at least senseful reason about the inner workings
of
the
libraries used.
Gives this people have to read and understand in anyway, why
try
to
hide
it?

It really miss the beauty of 0.10 version DSL.
Apparently not a thing I can influence but just warn about.

@gouzhang
you can't imagine how many extra IQ-Statestores I constantly
prune
from
stream app's
because people just keep passing Materialized's into all the
operations.
:D :'-(
I regret that I couldn't convince you guys back then. Plus this
whole
entire topology as a floating
interface chain, never seen it anywhere :-/ :'(

I don't know. I guess this is just me regretting to only have
24h/day.


I updated the KIP today with some points worth talking about,
should
anyone
be so inclined to check it out. Currently we are running this
code
in
production to handle relational joins from our Kafka Connect
topics, as
per
the original motivation of the KIP.










I believe the foreignKeyJoin should be responsible for. In my



On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<
wangg...@gmail.com
wrote:

Hello Adam,
As for your question regarding GraphNodes, it is for
extending
Streams
optimization framework. You can find more details on
https://issues.apache.org/jira/browse/KAFKA-6761.

The main idea is that instead of directly building up the
"physical
topology" (represented as Topology in the public package, and
internally
built as the ProcessorTopology class) while users are
specifying
the
transformation operators, we first keep it as a "logical
topology"
(represented as GraphNode inside InternalStreamsBuilder). And
then
only
execute the optimization and the construction of the
"physical"
Topology
when StreamsBuilder.build() is called.

Back to your question, I think it makes more sense to add a
new
type of
StreamsGraphNode (maybe you can consider inheriting from the
BaseJoinProcessorNode). Note that although in the Topology we
will
have
multiple connected ProcessorNodes to represent a
(foreign-key)
join, we
still want to keep it as a single StreamsGraphNode, or just a
couple of
them in the logical representation so that in the future we
can
construct
the physical topology differently (e.g. having another way
than
the
current
distributed hash-join).

-------------------------------------------------------

Back to your questions to KIP-213, I think Jan has summarized
it
pretty-well. Note that back then we do not have headers
support
so
we
have
to do such "key-widening" approach to ensure ordering.


Guozhang



On Mon, Aug 13, 2018 at 11:39 PM, Jan
Filipiak<jan.filip...@trivago.com>
wrote:

Hi Adam,
I love how you are on to this already! I resolve this by
"key-widening"
I
treat the result of FKA,and FKB differently.
As you can see the output of my join has a Combined Key and
therefore I
can resolve the "race condition" in a group by
if I so desire.

I think this reflects more what happens under the hood and
makes
it more
clear to the user what is going on. The Idea
of hiding this behind metadata and handle it in the DSL is
from
my POV
unideal.

To write into your example:

key + A, null)
(key +B, <joined On FK =B>)

is what my output would look like.


Hope that makes sense :D

Best Jan





On 13.08.2018 18:16, Adam Bellemare wrote:

Hi Jan
If you do not use headers or other metadata, how do you
ensure
that
changes
to the foreign-key value are not resolved out-of-order?
ie: If an event has FK = A, but you change it to FK = B,
you
need to
propagate both a delete (FK=A -> null) and an addition
(FK=B).
In my
solution, without maintaining any metadata, it is possible
for
the
final
output to be in either order - the correctly updated joined
value, or

the
null for the delete.
(key, null)
(key, <joined On FK =B>)

or

(key, <joined On FK =B>)
(key, null)

I looked back through your code and through the discussion
threads, and
didn't see any information on how you resolved this. I
have a
version
of
my
code working for 2.0, I am just adding more integration
tests
and will
update the KIP accordingly. Any insight you could provide
on
resolving
out-of-order semantics without metadata would be helpful.

Thanks
Adam


On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi,

Happy to see that you want to make an effort here.

Regarding the ProcessSuppliers I couldn't find a way to
not
rewrite
the
joiners + the merger.
The re-partitioners can be reused in theory. I don't know
if
repartition
is optimized in 2.0 now.
I made this
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
241+
KTable+repartition+with+compacted+Topics
back then and we are running KIP-213 with KIP-241 in
combination.
For us it is vital as it minimized the size we had in our
repartition
topics plus it removed the factor of 2 in events on every
message.
I know about this new  "delete once consumer has read it".
I
don't

think
241 is vital for all usecases, for ours it is. I wanted
to use 213 to sneak in the foundations for 241 aswell.

I don't quite understand what a PropagationWrapper is,
but I
am
certain
that you do not need RecordHeaders
for 213 and I would try to leave them out. They either
belong
to the

DSL
or to the user, having a mixed use is
to be avoided. We run the join with 0.8 logformat and I
don't
think
one
needs more.

This KIP will be very valuable for the streams project! I
couldn't
never
convince myself to invest into the 1.0+ DSL
as I used almost all my energy to fight against it. Maybe
this
can
also
help me see the good sides a little bit more.

If there is anything unclear with all the text that has
been
written,
feel
free to just directly cc me so I don't miss it on
the mailing list.

Best Jan





On 08.08.2018 15:26, Adam Bellemare wrote:

More followup, and +dev as Guozhang replied to me directly
previously.

I am currently porting the code over to trunk. One of the
major
changes
since 1.0 is the usage of GraphNodes. I have a question
about
this:
For a foreignKey joiner, should it have its own dedicated
node
type?

Or
would it be advisable to construct it from existing
GraphNode
components?
For instance, I believe I could construct it from several
OptimizableRepartitionNode, some SinkNode, some
SourceNode,
and
several
StatefulProcessorNode. That being said, there is some
underlying
complexity
to each approach.

I will be switching the KIP-213 to use the RecordHeaders
in
Kafka
Streams
instead of the PropagationWrapper, but conceptually it
should
be the
same.

Again, any feedback is welcomed...


On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <
adam.bellem...@gmail.com
wrote:

Hi Guozhang et al

I was just reading the 2.0 release notes and noticed a
section
on
Record
Headers.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
244%3A+Add+Record+Header+support+to+Kafka+Streams+
Processor+API
I am not yet sure if the contents of a RecordHeader is
propagated
all
the
way through the Sinks and Sources, but if it is, and if
it
remains
attached
to the record (including null records) I may be able to
ditch
the
propagationWrapper for an implementation using
RecordHeader.
I am
not
yet
sure if this is doable, so if anyone understands
RecordHeader
impl
better
than I, I would be happy to hear from you.

In the meantime, let me know of any questions. I believe
this
PR has

a
lot
of potential to solve problems for other people, as I
have
encountered
a
number of other companies in the wild all home-brewing
their
own
solutions
to come up with a method of handling relational data in
streams.
Adam


On Fri, Jul 27, 2018 at 1:45 AM, Guozhang
Wang<wangg...@gmail.com>
wrote:

Hello Adam,

Thanks for rebooting the discussion of this KIP ! Let me
finish my
pass
on the wiki and get back to you soon. Sorry for the
delays..
Guozhang

On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
adam.bellem...@gmail.com

wrote:

Let me kick this off with a few starting points that I
would like

to
generate some discussion on.
1) It seems to me that I will need to repartition the
data
twice -
once
on
the foreign key, and once back to the primary key. Is
there
anything
I
am
missing here?

2) I believe I will also need to materialize 3 state
stores: the
prefixScan
SS, the highwater mark SS (for out-of-order
resolution)
and
the

final
state
store, due to the workflow I have laid out. I have not
thought of
a
better
way yet, but would appreciate any input on this
matter.
I
have
gone
back
through the mailing list for the previous discussions
on
this KIP,
and
I
did not see anything relating to resolving
out-of-order
compute. I
cannot
see a way around the current three-SS structure that I
have.
3) Caching is disabled on the prefixScan SS, as I do
not
know how

to
resolve the iterator obtained from rocksDB with that of the
cache.
In
addition, I must ensure everything is flushed before
scanning.
Since
the
materialized prefixScan SS is under "control" of the
function, I
do
not
anticipate this to be a problem. Performance
throughput
will need

to
be
tested, but as Jan observed in his initial overview of
this
issue,

it
is
generally a surge of output events which affect
performance
moreso
than
the
flush or prefixScan itself.

Thoughts on any of these are greatly appreciated,
since
these
elements
are
really the cornerstone of the whole design. I can put
up
the code
I
have
written against 1.0.2 if we so desire, but first I was
hoping to

just
tackle some of the fundamental design proposals.
Thanks,
Adam



On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
adam.bellem...@gmail.com>
wrote:

Here is the new discussion thread for KIP-213. I
picked
back up on
the
KIP

as this is something that we too at Flipp are now
running
in
production.

Jan started this last year, and I know that Trivago is
also
using

something

similar in production, at least in terms of APIs and
functionality.

https://cwiki.apache.org/
confluence/display/KAFKA/KIP-
213+Support+non-key+joining+in+KTable

I do have an implementation of the code for Kafka
1.0.2
(our
local
production version) but I won't post it yet as I
would
like to

focus
on the
workflow and design first. That being said, I also
need
to
add
some

clearer

integration tests (I did a lot of testing using a
non-Kafka
Streams

framework) and clean up the code a bit more before
putting
it in
a
PR
against trunk (I can do so later this week likely).

Please take a look,

Thanks

Adam Bellemare



--

-- Guozhang

--
-- Guozhang





--
-- Guozhang



--
-- Guozhang




Reply via email to