Hi Matthias
Thank you for your feedback, I do appreciate it!
While name spacing would be possible, it would require to deserialize
user headers what implies a runtime overhead. I would suggest to no
namespace for now to avoid the overhead. If this becomes a problem in
the future, we can still add name spacing later on.
Agreed. I will go with using a reserved string and document it.
My main concern about the design it the type of the result KTable: If I
understood the proposal correctly,
In your example, you have table1 and table2 swapped. Here is how it works
currently:
1) table1 has the records that contain the foreign key within their value.
table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)>
table2 input stream: <A,X>, <B,Y>
2) A Value mapper is required to extract the foreign key.
table1 foreign key mapper: ( value => value.fk )
The mapper is applied to each element in table1, and a new combined key is
made:
table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, (fk=B,bar=3)>
3) The rekeyed events are copartitioned with table2:
a) Stream Thread with Partition 0:
RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
Table2: <A,X>
b) Stream Thread with Partition 1:
RepartitionedTable1: <B-c, (fk=B,bar=3)>
Table2: <B,Y>
4) From here, they can be joined together locally by applying the joiner
function.
At this point, Jan's design and my design deviate. My design goes on to
repartition the data post-join and resolve out-of-order arrival of records,
finally returning the data keyed just the original key. I do not expose the
CombinedKey or any of the internals outside of the joinOnForeignKey
function. This does make for larger footprint, but it removes all agency
for resolving out-of-order arrivals and handling CombinedKeys from the
user. I believe that this makes the function much easier to use.
Let me know if this helps resolve your questions, and please feel free to
add anything else on your mind.
Thanks again,
Adam
On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io>
wrote:
Hi,
I am just catching up on this thread. I did not read everything so far,
but want to share couple of initial thoughts:
Headers: I think there is a fundamental difference between header usage
in this KIP and KP-258. For 258, we add headers to changelog topic that
are owned by Kafka Streams and nobody else is supposed to write into
them. In fact, no user header are written into the changelog topic and
thus, there are not conflicts.
Nevertheless, I don't see a big issue with using headers within Streams.
As long as we document it, we can have some "reserved" header keys and
users are not allowed to use when processing data with Kafka Streams.
IMHO, this should be ok.
I think there is a safe way to avoid conflicts, since these headers are
only needed in internal topics (I think):
For internal and changelog topics, we can namespace all headers:
* user-defined headers are namespaced as "external." + headerKey
* internal headers are namespaced as "internal." + headerKey
While name spacing would be possible, it would require to deserialize
user headers what implies a runtime overhead. I would suggest to no
namespace for now to avoid the overhead. If this becomes a problem in
the future, we can still add name spacing later on.
My main concern about the design it the type of the result KTable: If I
understood the proposal correctly,
KTable<K1,V1> table1 = ...
KTable<K2,V2> table2 = ...
KTable<K1,V3> joinedTable = table1.join(table2,...);
implies that the `joinedTable` has the same key as the left input table.
IMHO, this does not work because if table2 contains multiple rows that
join with a record in table1 (what is the main purpose of a foreign key
join), the result table would only contain a single join result, but not
multiple.
Example:
table1 input stream: <A,X>
table2 input stream: <a,(A,1)>, <b,(A,2)>
We use table2 value a foreign key to table1 key (ie, "A" joins). If the
result key is the same key as key of table1, this implies that the
result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
Because the share the same key, whatever result record we emit later,
overwrite the previous result.
This is the reason why Jan originally proposed to use a combination of
both primary keys of the input tables as key of the output table. This
makes the keys of the output table unique and we can store both in the
output table:
Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
Thoughts?
-Matthias
On 9/4/18 1:36 PM, Jan Filipiak wrote:
Just on remark here.
The high-watermark could be disregarded. The decision about the forward
depends on the size of the aggregated map.
Only 1 element long maps would be unpacked and forwarded. 0 element maps
would be published as delete. Any other count
of map entries is in "waiting for correct deletes to arrive"-state.
On 04.09.2018 21:29, Adam Bellemare wrote:
It does look like I could replace the second repartition store and
highwater store with a groupBy and reduce. However, it looks like I
would
still need to store the highwater value within the materialized store,
to
compare the arrival of out-of-order records (assuming my understanding
of
THIS is correct...). This in effect is the same as the design I have
now,
just with the two tables merged together.