Hi, I am just catching up on this thread. I did not read everything so far, but want to share couple of initial thoughts:
Headers: I think there is a fundamental difference between header usage in this KIP and KP-258. For 258, we add headers to changelog topic that are owned by Kafka Streams and nobody else is supposed to write into them. In fact, no user header are written into the changelog topic and thus, there are not conflicts. Nevertheless, I don't see a big issue with using headers within Streams. As long as we document it, we can have some "reserved" header keys and users are not allowed to use when processing data with Kafka Streams. IMHO, this should be ok. > I think there is a safe way to avoid conflicts, since these headers are > only needed in internal topics (I think): > For internal and changelog topics, we can namespace all headers: > * user-defined headers are namespaced as "external." + headerKey > * internal headers are namespaced as "internal." + headerKey While name spacing would be possible, it would require to deserialize user headers what implies a runtime overhead. I would suggest to no namespace for now to avoid the overhead. If this becomes a problem in the future, we can still add name spacing later on. My main concern about the design it the type of the result KTable: If I understood the proposal correctly, KTable<K1,V1> table1 = ... KTable<K2,V2> table2 = ... KTable<K1,V3> joinedTable = table1.join(table2,...); implies that the `joinedTable` has the same key as the left input table. IMHO, this does not work because if table2 contains multiple rows that join with a record in table1 (what is the main purpose of a foreign key join), the result table would only contain a single join result, but not multiple. Example: table1 input stream: <A,X> table2 input stream: <a,(A,1)>, <b,(A,2)> We use table2 value a foreign key to table1 key (ie, "A" joins). If the result key is the same key as key of table1, this implies that the result can either be <A, join(X,1)> or <A, join(X,2)> but not both. Because the share the same key, whatever result record we emit later, overwrite the previous result. This is the reason why Jan originally proposed to use a combination of both primary keys of the input tables as key of the output table. This makes the keys of the output table unique and we can store both in the output table: Result would be <A-a, join(X,1)>, <A-b, join(X,2)> Thoughts? -Matthias On 9/4/18 1:36 PM, Jan Filipiak wrote: > Just on remark here. > The high-watermark could be disregarded. The decision about the forward > depends on the size of the aggregated map. > Only 1 element long maps would be unpacked and forwarded. 0 element maps > would be published as delete. Any other count > of map entries is in "waiting for correct deletes to arrive"-state. > > On 04.09.2018 21:29, Adam Bellemare wrote: >> It does look like I could replace the second repartition store and >> highwater store with a groupBy and reduce. However, it looks like I >> would >> still need to store the highwater value within the materialized store, to >> compare the arrival of out-of-order records (assuming my understanding of >> THIS is correct...). This in effect is the same as the design I have now, >> just with the two tables merged together. >
signature.asc
Description: OpenPGP digital signature