Cool stuff.

I made some random remarks. Did not touch the core of the algorithm yet.

Will do Monday 100%

I don't see Interactive Queries :) like that!




On 17.08.2018 20:28, Adam Bellemare wrote:
I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for discussion?

On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

even before message headers, the option for me always existed to just wrap
the messages into my own custom envelop.
So I of course thought this through. One sentence in your last email
triggered all the thought process I put in the back then
again to design it in the, what i think is the "kafka-way". It ended up
ranting a little about what happened in the past.

I see plenty of colleagues of mine falling into traps in the API, that I
did warn about in the 1.0 DSL rewrite. I have the same
feeling again. So I hope it gives you some insights into my though
process. I am aware that since i never ported 213 to higher
streams version, I don't really have a steak here and initially I didn't
feel like actually sending it. But maybe you can pull
something good from it.

  Best jan



On 15.08.2018 04:44, Adam Bellemare wrote:

@Jan
Thanks Jan. I take it you mean "key-widening" somehow includes information
about which record is processed first? I understand about a CombinedKey
with both the Foreign and Primary key, but I don't see how you track
ordering metadata in there unless you actually included a metadata field
in
the key type as well.

@Guozhang
As Jan mentioned earlier, is Record Headers mean to strictly be used in
just the user-space? It seems that it is possible that a collision on the
(key,value) tuple I wish to add to it could occur. For instance, if I
wanted to add a ("foreignKeyOffset",10) to the Headers but the user
already
specified their own header with the same key name, then it appears there
would be a collision. (This is one of the issues I brought up in the KIP).

--------------------------------

I will be posting a prototype PR against trunk within the next day or two.
One thing I need to point out is that my design very strictly wraps the
entire foreignKeyJoin process entirely within the DSL function. There is
no
exposure of CombinedKeys or widened keys, nothing to resolve with regards
to out-of-order processing and no need for the DSL user to even know
what's
going on inside of the function. The code simply returns the results of
the
join, keyed by the original key. Currently my API mirrors identically the
format of the data returned by the regular join function, and I believe
that this is very useful to many users of the DSL. It is my understanding
that one of the main design goals of the DSL is to provide higher level
functionality without requiring the users to know exactly what's going on
under the hood. With this in mind, I thought it best to solve ordering and
partitioning problems within the function and eliminate the requirement
for
users to do additional work after the fact to resolve the results of their
join. Basically, I am assuming that most users of the DSL just "want it to
work" and want it to be easy. I did this operating under the assumption
that if a user truly wants to optimize their own workflow down to the
finest details then they will break from strictly using the DSL and move
down to the processors API.

I think. The abstraction is not powerful enough
to not have kafka specifics leak up The leak I currently think this has is
that you can not reliable prevent the delete coming out first,
before you emit the correct new record. As it is an abstraction entirely
around kafka.
I can only recommend to not to. Honesty and simplicity should always be
first prio
trying to hide this just makes it more complex, less understandable and
will lead to mistakes
in usage.

Exactly why I am also in big disfavour of GraphNodes and later
optimization stages.
Can someone give me an example of an optimisation that really can't be
handled by the user
constructing his topology differently?
Having reusable Processor API components accessible by the DSL and
composable as
one likes is exactly where DSL should max out and KSQL should do the next
step.
I find it very unprofessional from a software engineering approach to run
software where
you can not at least senseful reason about the inner workings of the
libraries used.
Gives this people have to read and understand in anyway, why try to hide
it?

It really miss the beauty of 0.10 version DSL.
Apparently not a thing I can influence but just warn about.

@gouzhang
you can't imagine how many extra IQ-Statestores I constantly prune from
stream app's
because people just keep passing Materialized's into all the operations.
:D :'-(
I regret that I couldn't convince you guys back then. Plus this whole
entire topology as a floating
interface chain, never seen it anywhere :-/ :'(

I don't know. I guess this is just me regretting to only have 24h/day.



I updated the KIP today with some points worth talking about, should anyone
be so inclined to check it out. Currently we are running this code in
production to handle relational joins from our Kafka Connect topics, as
per
the original motivation of the KIP.










I believe the foreignKeyJoin should be responsible for. In my



On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<wangg...@gmail.com>
wrote:

Hello Adam,
As for your question regarding GraphNodes, it is for extending Streams
optimization framework. You can find more details on
https://issues.apache.org/jira/browse/KAFKA-6761.

The main idea is that instead of directly building up the "physical
topology" (represented as Topology in the public package, and internally
built as the ProcessorTopology class) while users are specifying the
transformation operators, we first keep it as a "logical topology"
(represented as GraphNode inside InternalStreamsBuilder). And then only
execute the optimization and the construction of the "physical" Topology
when StreamsBuilder.build() is called.

Back to your question, I think it makes more sense to add a new type of
StreamsGraphNode (maybe you can consider inheriting from the
BaseJoinProcessorNode). Note that although in the Topology we will have
multiple connected ProcessorNodes to represent a (foreign-key) join, we
still want to keep it as a single StreamsGraphNode, or just a couple of
them in the logical representation so that in the future we can construct
the physical topology differently (e.g. having another way than the
current
distributed hash-join).

-------------------------------------------------------

Back to your questions to KIP-213, I think Jan has summarized it
pretty-well. Note that back then we do not have headers support so we
have
to do such "key-widening" approach to ensure ordering.


Guozhang



On Mon, Aug 13, 2018 at 11:39 PM, Jan Filipiak<jan.filip...@trivago.com>
wrote:

Hi Adam,
I love how you are on to this already! I resolve this by "key-widening"
I
treat the result of FKA,and FKB differently.
As you can see the output of my join has a Combined Key and therefore I
can resolve the "race condition" in a group by
if I so desire.

I think this reflects more what happens under the hood and makes it more
clear to the user what is going on. The Idea
of hiding this behind metadata and handle it in the DSL is from my POV
unideal.

To write into your example:

key + A, null)
(key +B, <joined On FK =B>)

is what my output would look like.


Hope that makes sense :D

Best Jan





On 13.08.2018 18:16, Adam Bellemare wrote:

Hi Jan
If you do not use headers or other metadata, how do you ensure that
changes
to the foreign-key value are not resolved out-of-order?
ie: If an event has FK = A, but you change it to FK = B, you need to
propagate both a delete (FK=A -> null) and an addition (FK=B). In my
solution, without maintaining any metadata, it is possible for the
final
output to be in either order - the correctly updated joined value, or

the
null for the delete.
(key, null)
(key, <joined On FK =B>)

or

(key, <joined On FK =B>)
(key, null)

I looked back through your code and through the discussion threads, and
didn't see any information on how you resolved this. I have a version
of
my
code working for 2.0, I am just adding more integration tests and will
update the KIP accordingly. Any insight you could provide on resolving
out-of-order semantics without metadata would be helpful.

Thanks
Adam


On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi,

Happy to see that you want to make an effort here.

Regarding the ProcessSuppliers I couldn't find a way to not rewrite
the
joiners + the merger.
The re-partitioners can be reused in theory. I don't know if

repartition
is optimized in 2.0 now.
I made this
https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+
KTable+repartition+with+compacted+Topics
back then and we are running KIP-213 with KIP-241 in combination.

For us it is vital as it minimized the size we had in our repartition
topics plus it removed the factor of 2 in events on every message.
I know about this new  "delete once consumer has read it".  I don't

think
241 is vital for all usecases, for ours it is. I wanted
to use 213 to sneak in the foundations for 241 aswell.

I don't quite understand what a PropagationWrapper is, but I am
certain
that you do not need RecordHeaders
for 213 and I would try to leave them out. They either belong to the

DSL
or to the user, having a mixed use is
to be avoided. We run the join with 0.8 logformat and I don't think
one
needs more.

This KIP will be very valuable for the streams project! I couldn't

never
convince myself to invest into the 1.0+ DSL
as I used almost all my energy to fight against it. Maybe this can
also
help me see the good sides a little bit more.

If there is anything unclear with all the text that has been written,
feel
free to just directly cc me so I don't miss it on
the mailing list.

Best Jan





On 08.08.2018 15:26, Adam Bellemare wrote:

More followup, and +dev as Guozhang replied to me directly previously.

I am currently porting the code over to trunk. One of the major

changes
since 1.0 is the usage of GraphNodes. I have a question about this:
For a foreignKey joiner, should it have its own dedicated node type?

Or
would it be advisable to construct it from existing GraphNode
components?
For instance, I believe I could construct it from several
OptimizableRepartitionNode, some SinkNode, some SourceNode, and

several
StatefulProcessorNode. That being said, there is some underlying
complexity
to each approach.

I will be switching the KIP-213 to use the RecordHeaders in Kafka
Streams
instead of the PropagationWrapper, but conceptually it should be the
same.

Again, any feedback is welcomed...


On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <
adam.bellem...@gmail.com
wrote:

Hi Guozhang et al

I was just reading the 2.0 release notes and noticed a section on
Record
Headers.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API

I am not yet sure if the contents of a RecordHeader is propagated
all
the
way through the Sinks and Sources, but if it is, and if it remains
attached
to the record (including null records) I may be able to ditch the
propagationWrapper for an implementation using RecordHeader. I am
not
yet
sure if this is doable, so if anyone understands RecordHeader impl
better
than I, I would be happy to hear from you.

In the meantime, let me know of any questions. I believe this PR has

a
lot
of potential to solve problems for other people, as I have

encountered
a
number of other companies in the wild all home-brewing their own
solutions
to come up with a method of handling relational data in streams.

Adam


On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang<wangg...@gmail.com>
wrote:

Hello Adam,

Thanks for rebooting the discussion of this KIP ! Let me finish my
pass
on the wiki and get back to you soon. Sorry for the delays..

Guozhang

On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
adam.bellem...@gmail.com

wrote:

Let me kick this off with a few starting points that I would like

to
generate some discussion on.
1) It seems to me that I will need to repartition the data twice -
once
on
the foreign key, and once back to the primary key. Is there

anything
I
am
missing here?

2) I believe I will also need to materialize 3 state stores: the
prefixScan
SS, the highwater mark SS (for out-of-order resolution) and the

final
state
store, due to the workflow I have laid out. I have not thought of
a
better
way yet, but would appreciate any input on this matter. I have
gone
back
through the mailing list for the previous discussions on this KIP,
and
I
did not see anything relating to resolving out-of-order compute. I
cannot
see a way around the current three-SS structure that I have.

3) Caching is disabled on the prefixScan SS, as I do not know how

to
resolve the iterator obtained from rocksDB with that of the cache.
In
addition, I must ensure everything is flushed before scanning.
Since
the
materialized prefixScan SS is under "control" of the function, I
do
not
anticipate this to be a problem. Performance throughput will need

to
be
tested, but as Jan observed in his initial overview of this issue,

it
is
generally a surge of output events which affect performance moreso
than
the
flush or prefixScan itself.

Thoughts on any of these are greatly appreciated, since these
elements
are
really the cornerstone of the whole design. I can put up the code
I
have
written against 1.0.2 if we so desire, but first I was hoping to

just
tackle some of the fundamental design proposals.
Thanks,
Adam



On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
adam.bellem...@gmail.com>
wrote:

Here is the new discussion thread for KIP-213. I picked back up on
the
KIP

as this is something that we too at Flipp are now running in

production.

Jan started this last year, and I know that Trivago is also using

something

similar in production, at least in terms of APIs and
functionality.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
213+Support+non-key+joining+in+KTable

I do have an implementation of the code for Kafka 1.0.2 (our
local
production version) but I won't post it yet as I would like to

focus
on the
workflow and design first. That being said, I also need to add
some

clearer

integration tests (I did a lot of testing using a non-Kafka
Streams

framework) and clean up the code a bit more before putting it in
a
PR
against trunk (I can do so later this week likely).

Please take a look,

Thanks

Adam Bellemare



--

-- Guozhang


--
-- Guozhang



Reply via email to