Hey everyone, Two more minor updates to the KIP came out of wrapping up the implementation and discussions with Guozhang and Matthias offline:
- Versioned stores will be disabled for global tables and also the suppress operator, in order to limit the scope of these changes and to prevent unexpected semantic consequences. - The return type of `VersionedKeyValueStore#put(...)` has been updated from `void` to `long`, where the long represents the validTo timestamp of the newly put record, and may also be used to determine whether the call to put() was accepted or not (due to grace period having been expired). This is necessary in order to allow processors to determine whether records are out-of-order or not. The KIP has been updated with more details about each of the two changes. Barring objections, the current state of the KIP is what will be released with 3.5 as the release deadline is fast approaching. Thanks, Victoria On Tue, Apr 11, 2023 at 2:56 PM Victoria Xia <victoria....@confluent.io> wrote: > Thanks for your comments and suggestions, Matthias, Lucas, and Guozhang! > > I was just in the process of responding when I saw Guozhang's message. I > came up with a different approach to simplify my proposal with regards to > the table aggregate processor, as part of mulling over comments from > Matthias and Lucas: when aggregating a versioned table, instead of dropping > out-of-order records at the aggregate processor (after the repartition), we > can equivalently drop them before the repartition at the repartition map > processor. With this new approach, no changes to the repartition topic > format are necessary as part of this KIP. > > As an example, consider a table aggregation which counts the number of > appearances of each value of the table. The repartition prior to the > aggregation will map values (pre-repartition) to keys (post-repartition) as > part of the groupBy(), in order for subsequent aggregation over the > (original) values to occur. > > source record --> repartition record(s) > (key=k, value=v1, ts=2) --> (key=v1, newValue=v1, oldValue=null, newTs=2, > oldTs=-1) > (key=k, value=v2, ts=1) --> (key=v2, newValue=v2, oldValue=null, newTs=1, > oldTs=2), (key=v1, newValue=null, oldValue=v1, newTs=1, oldTs=2) > > Under the old proposal, the aggregate processor would see the last two > repartition records and drop them as out-of-order (because newTs is older > than oldTs). Under the new proposal, the repartition map processor would > not send any repartition records upon seeing the second source record (with > value=v2) because its timestamp is older than the latest timestamp seen for > the key (k). > > This new approach is simpler than what's currently written in the KIP > because no repartition topic format change is required, and also saves on > unnecessary repartition topic records. > > In comparing this suggestion to Guozhang's suggestion: > > - A main difference between the two proposals is that under this > proposal, the "old value" when an out-of-order record is forwarded > downstream from a versioned table would be the current latest (by > timestamp) value, rather than the same as the new value (as in Guozhang's > proposal). > - In both proposals, the various processors (table aggregate, joins, > suppress) actually do not need to call get(key) on the materialization to > determine whether the record is out-of-order or not. If the "old value" is > the latest record, then a timestamp comparison would suffice (assuming that > the old record timestamp is added into the `Change` object). If the "old > value" is the same as the "new value", then an equality check on the two > values is sufficient. > - In both proposals, the repartition topic format does not need to be > updated. > > I think regardless of which of the two implementations we go with, the net > effect will be hidden from users, in which case it may be better to discuss > which to pick as part of implementation rather than on this KIP itself. (I > can tell I will need more time to process the tradeoffs :-) ) Regardless, I > will update the KIP to reflect that no repartition topic format change is > required, which is indeed a great simplification. > > > for the repartition topic format change, do we want to re-use flag=2, or > should we introduce flag=3, and determine when compiling the DSL into the > Topology if we want/need to include the timestamp, and if not, use format > version=2 to avoid unnecessary overhead? > > I believe the new proposal above is even more efficient in terms of > avoiding unnecessary overhead. LMK what you think. > > > for detecting out-of-order records, I need both new and old timestamp, I > suppose I get it for the new record via timestamp extractor, can I not get > it the same way from the old record that is passed down to the aggregation > after KIP-904? > > Also subsumed by the updated proposal above, but I think the answer is not > necessarily, both for the reason Matthias gave (when the old and new > records are sent in the same message, there is only one timestamp) and > because even when the old record is sent on its own, it is sent with the > new record's timestamp in the timestamp field (link > <https://github.com/apache/kafka/blob/e49a5a265fd2d60d197b940b7c2a6867f7b90cb1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L148>). > So, if the timestamp extractor extracts the timestamp from the key/value of > the message, then yes all is well, but if the timestamp extractor uses the > record timestamp, then this timestamp will not be accurate for the old > record. > > On Tue, Apr 11, 2023 at 2:26 PM Guozhang Wang <guozhang.wang...@gmail.com> > wrote: > >> Thanks Victoria. >> >> 1) I have no concerns about the filter operator's proposed semantics. >> >> 2) For aggregations, I have meta question in mind to discuss first, >> which is for those operators that generate a table, which is >> materialized as versioned, how we should emit out of order data if the >> operator still needs to. A quick look at the following operations >> today: >> >> a. Read topic as a table source >> b. KStream#toTable >> c. Aggregated from a stream >> d. Aggregated from a table >> e. From a table-table join. >> f. From a stateless table operator like a `table.filter`. >> >> Per this KIP, case d) and e) should not emit out-of-order records to >> the downstream any more, so we only need to consider the others: today >> when we send out the old/new pairs downstream, the old value is just >> the value read right before being overwritten from the materialized >> store. If the store is versioned, however, then this old value is >> already been sent before as part of the new/old pair, so it's actually >> correct to just indicate the old/new pair as just the out-of-order >> record itself? More specifically: say given a table source operator, >> with the topic's incoming records for the same key: >> >> (A, t10), (B, t20), (C, t15) >> >> If the store is not versioned, we would emit, in the form of old/new: >> >> A10/null, B20/A10, C15/B20 >> >> While if there's no such out-of-ordering, the ideal emit ordering should >> be: >> >> A10/null, C15/A10, B20/C15 >> >> So I'm thinking, if the store is versioned, we should try to emit in a >> way that is as coherent with the ideal ordering as possible, for the >> downstream operators to handle: >> >> A10/null, B20/A10, C15/null, null/C15 (or to be succinct, just >> A10/null, B20/A10, C15/C15) >> >> This is because, the A10 is already sent as part of the B20/A10 before >> C comes, in order for downstream operators to negate its effect; so >> when C comes, we only need to let the downstream know that "there was >> a C coming at t15 between A at 10 and B at 20, which is already >> obsoleted because of the later B20 that I sent you before". >> >> This gives the underlying operator the correct information, which can >> handle it accordingly: >> >> * For aggregate operators, it can simply ignore the C15/C15 from upstream. >> * For stateless operators, it just apply the filter still on C15/C15 >> and forward downwards. >> * For join operators, as this KIP indicated, it would apply the join >> if necessary and not emit the older join results. >> >> If we can do that, then maybe we do not even need to change the >> repartition topic format again? >> >> >> Guozhang >> >> On Tue, Apr 11, 2023 at 11:17 AM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> > If we send old and new value as two messages, this should work I guess? >> > Victory could confirm. -- But not if we send old/new as a single message >> > in case the new-key does not change? >> > >> > -Matthias >> > >> > On 4/11/23 5:25 AM, Lucas Brutschy wrote: >> > > Hi, >> > > >> > > No concerns at all, just a clarifying question from my side: for >> > > detecting out-of-order records, I need both new and old timestamp, I >> > > suppose I get it for the new record via timestamp extractor, can I not >> > > get it the same way from the old record that is passed down to the >> > > aggregation after KIP-904? >> > > >> > > Thanks, >> > > Lucas >> > > >> > > On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> >> > >> Thanks. >> > >> >> > >> One question: for the repartition topic format change, do we want to >> > >> re-use flag=2, or should we introduce flag=3, and determine when >> > >> compiling the DSL into the Topology if we want/need to include the >> > >> timestamp, and if not, use format version=2 to avoid unnecessary >> overhead? >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> On 4/10/23 5:47 PM, Victoria Xia wrote: >> > >>> Hi everyone, >> > >>> >> > >>> While wrapping up the implementation for KIP-914, I have discovered >> that >> > >>> two more DSL processors require semantic updates in the presence of >> > >>> versioned tables: >> > >>> >> > >>> - The table filter processor has an optimization to drop nulls >> if the >> > >>> previous filtered value is also null. When the upstream table >> is versioned, >> > >>> this optimization should be disabled in order to preserve >> proper version >> > >>> history in the presence of out-of-order data. >> > >>> - When performing an aggregation over a versioned table, only >> the latest >> > >>> value by timestamp (per key) should be included in the final >> aggregate >> > >>> value. This is not happening today in the presence of >> out-of-order data, >> > >>> due to the way that TableSourceNodes call `get(key)` in order >> to determine >> > >>> the "old value" which is to be removed from the aggregate as >> part of >> > >>> applying an update. To fix this, aggregations should ignore >> out-of-order >> > >>> records when aggregating versioned tables. >> > >>> - In order to implement this change, table aggregate >> processors need >> > >>> a way to determine whether a record is out-of-order or not. >> This >> > >>> cannot be >> > >>> done by querying the source table value getter as that >> store belongs to a >> > >>> different subtopology (because a repartition occurs before >> > >>> aggregation). As >> > >>> such, an additional timestamp must be included in the >> repartition topic. >> > >>> The 3.5 release already includes an update to the >> repartition >> > >>> topic format >> > >>> (with upgrade implications properly handled) via KIP-904 >> > >>> < >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed >> >, >> > >>> so making an additional change to the repartition topic >> format to add a >> > >>> timestamp comes at no additional cost to users. >> > >>> >> > >>> >> > >>> I have updated the KIP >> > >>> < >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores >> > >> > >>> itself with more detail about each of these changes. Please let me >> know if >> > >>> there are any concerns. In the absence of dissent, I'd like to >> include >> > >>> these changes along with the rest of KIP-914 in the 3.5 release. >> > >>> >> > >>> Apologies for not noticing these additional semantics implications >> earlier, >> > >>> Victoria >> > >>> >> > >>> ---------- Forwarded message --------- >> > >>> From: Victoria Xia <victoria....@confluent.io> >> > >>> Date: Wed, Mar 22, 2023 at 10:08 AM >> > >>> Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned >> Stores >> > >>> To: <dev@kafka.apache.org> >> > >>> >> > >>> >> > >>> Thanks for voting, everyone! We have three binding yes votes with no >> > >>> objections during four full days of voting. I will close the vote >> and mark >> > >>> the KIP as accepted, right in time for the 3.5 release. >> > >>> >> > >>> Thanks, >> > >>> Victoria >> > >>> >> > >>> On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna <cado...@apache.org> >> wrote: >> > >>> >> > >>>> +1 (binding) >> > >>>> >> > >>>> Thanks Victoria! >> > >>>> >> > >>>> Best, >> > >>>> Bruno >> > >>>> >> > >>>> On 20.03.23 17:13, Matthias J. Sax wrote: >> > >>>>> +1 (binding) >> > >>>>> >> > >>>>> On 3/20/23 9:05 AM, Guozhang Wang wrote: >> > >>>>>> +1, thank you Victoria! >> > >>>>>> >> > >>>>>> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia >> > >>>>>> <victoria....@confluent.io.invalid> wrote: >> > >>>>>>> >> > >>>>>>> Hi all, >> > >>>>>>> >> > >>>>>>> I'd like to start a vote on KIP-914 for updating the Kafka >> Streams join >> > >>>>>>> processors to use proper timestamp-based semantics in >> applications with >> > >>>>>>> versioned stores: >> > >>>>>>> >> > >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores >> > >>>>>>> >> > >>>>>>> To avoid compatibility concerns, I'd like to include the >> changes from >> > >>>>>>> this >> > >>>>>>> KIP together with KIP-889 >> > >>>>>>> < >> > >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores >> > >>>>> >> > >>>>>>> (for introducing versioned stores) in the upcoming 3.5 release. >> I will >> > >>>>>>> close the vote on the 3.5 KIP deadline, March 22, if there are >> no >> > >>>>>>> objections before then. >> > >>>>>>> >> > >>>>>>> Thanks, >> > >>>>>>> Victoria >> > >>>> >> > >>> >> >