Hi Gouzhang
thank you for the remarks!
I answered inline for clarity.
Thank you
On 02.11.2017 01:17, Guozhang Wang wrote:
Thanks for the KIP writeup Jan. I made a first pass and here are some quick
comments:
1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit harder to
differentiate when reading.
Will address
2. I think you missed the key type in the intrusive approach example code
snippet regarding "KTable <V0> oneToManyJoin"? Should that be
KTable<CombinedKey<K,KO>, V0> oneToManyJoin
addressed
3. Some of the arrows in your algorithm section's diagrams seems reversed.
True! Is there a preferred tool for showing DAG's? I feel that ASCII-ART
png might not be the way
4. In the first step of the algorithm, "Materialize B first", that happens
in the "Repartition by A's key" block right? If yes, could you clarify it
in the block?
Yes correct. I need some better way to draw the DAG especially showing
what is a processor and which belong to a task
5. "skip old if A's key didn't change": hmm, not sure if we can skip it.
What if other fields (neither A's key or B's key) changes? Suppose you have
an aggregation after the join, we still need to subtract the old value from
the aggregation right?
We can. We materialize B again after the repartition for the range scan
again anyways. We can retrieve the old fields from there.
For "deep" topologies this de-duplication is essential. Every change we
split we cant re-unite. This is a factor of 2 in number of events
running through our dag. It starts hurting from the beginning but once
you hit repartition 4 - 5 it becomes really painfull. To be honest
we never repartition Changes with ChangedSerializer. Proper
repartitioning (stuffing the actually record in their new partition) is the
only way to go forward for us. That also gives us compaction on the
repartitioned topis.
6. In the block of "Materialize B", I think from your description we are
actually materializing both A and B right? If yes could you update the
diagram?
It says in the list that we materialize A. Will add that all the "*"
things are supposed to recieve a enableSendOld call.
7. This is a meta question: "in the sink, only use A's key to determine
partition" I think we had the discussion long time ago, that if we are
sending the old and new entries of the pair to different partitions, their
ordering may get reversed later when reading from the join operator (i.e.
the "Materialize B" block in your diagram). How did you address that with
this proposal?
That is the question I want to settle in the this KIP. Both of my
approaches delegate this to the Output Key.
By having the Keys of both tables combined one can not treat the records
as with B's key only. (This is common in RDBMS)
But the key would be "opened" to contain both keys. this resolves the
race condition as one record is gonna have a different A
then the other. Only one is going to last longer the other one is
"pending" deletion.
8. "B records with a 'null' A-key value would be silently dropped" Where
are we dropping it, do we drop it at the first sub-topology (i.e the
"Repartition by A's key" block)?
Yes this is where we would apply the extractor to get it. This only
works if we used the intrusive approach tough.
othwerise we wouldn't know that A's key is null. In that case we would
need to have the user filter first.
Two things are to considered I think:
1 If there are many records with null, the partition they ended up in
might run hot.
2. we need a way to indicate to the iterator that this key is not to be
found. empty byte array would join against all records.
Null might work but no Serdes gonna give us null, they all produces some
byte[]'s
Guozhang
On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:
Hi thanks for the feedback
On 01.11.2017 12:58, Damian Guy wrote:
Hi Jan, Thanks for the KIP!
In both alternatives the API will need to use the `Joined` class rather
than than passing in `Serde`s. Also, as with all other joins etc, there
probably should be an overload that doesn't require any `Serdes`.
Will check again how current API looks. I remember loosing the argument
with this IQ overloads things.
Didn't expect something to have happend already so I just copied from the
PR. Will update.
Will also add the overload.
It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
says "returning an outputKey that when serialized only produces a prefix
of
the output key which is the same serializing K" So why not just use "K" ?
The faker in fact returns K wich can be serialized by the Key Serde in the
rocks. But it needs to only contain A's key and it needs to be a strict
prefix
byte[] of all K with this A's key. We gonna seek there with an
RocksIterator and continue to read as long as the "faked key" serialized
form is a prefix
This is easy todo for Avro + Protobuf + custom Serdes and Hadoop
Writables. Its a nightmare for JSON serdes.
Thanks,
Damian
On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:
I think if you explain what A and B are in the beginning, it makes sense
to
use them since readers would know who they reference.
Cheers
On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:
Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with
"this ktable", "other ktable"
Thank you
Jan
On 27.10.2017 06:58, Ted Yu wrote:
Do you mind addressing my previous comments ?
http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable
On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:
Hello everyone,
this is the new discussion thread after the ID-clash.
Best
Jan
______
Hello Kafka-users,
I want to continue with the development of KAFKA-3705, which allows
the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible
and
have a good solution afterwards I invite everyone to read through the
KIP I
put together and discuss it here in this Thread.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720
I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward
to
everyones opinion!
Please keep the discussion on the mailing list rather than commenting
on
the wiki (wiki discussions get unwieldy fast).
Best
Jan