Hi everyone, While wrapping up the implementation for KIP-914, I have discovered that two more DSL processors require semantic updates in the presence of versioned tables:
- The table filter processor has an optimization to drop nulls if the previous filtered value is also null. When the upstream table is versioned, this optimization should be disabled in order to preserve proper version history in the presence of out-of-order data. - When performing an aggregation over a versioned table, only the latest value by timestamp (per key) should be included in the final aggregate value. This is not happening today in the presence of out-of-order data, due to the way that TableSourceNodes call `get(key)` in order to determine the "old value" which is to be removed from the aggregate as part of applying an update. To fix this, aggregations should ignore out-of-order records when aggregating versioned tables. - In order to implement this change, table aggregate processors need a way to determine whether a record is out-of-order or not. This cannot be done by querying the source table value getter as that store belongs to a different subtopology (because a repartition occurs before aggregation). As such, an additional timestamp must be included in the repartition topic. The 3.5 release already includes an update to the repartition topic format (with upgrade implications properly handled) via KIP-904 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>, so making an additional change to the repartition topic format to add a timestamp comes at no additional cost to users. I have updated the KIP <https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores> itself with more detail about each of these changes. Please let me know if there are any concerns. In the absence of dissent, I'd like to include these changes along with the rest of KIP-914 in the 3.5 release. Apologies for not noticing these additional semantics implications earlier, Victoria ---------- Forwarded message --------- From: Victoria Xia <victoria....@confluent.io> Date: Wed, Mar 22, 2023 at 10:08 AM Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores To: <dev@kafka.apache.org> Thanks for voting, everyone! We have three binding yes votes with no objections during four full days of voting. I will close the vote and mark the KIP as accepted, right in time for the 3.5 release. Thanks, Victoria On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna <cado...@apache.org> wrote: > +1 (binding) > > Thanks Victoria! > > Best, > Bruno > > On 20.03.23 17:13, Matthias J. Sax wrote: > > +1 (binding) > > > > On 3/20/23 9:05 AM, Guozhang Wang wrote: > >> +1, thank you Victoria! > >> > >> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia > >> <victoria....@confluent.io.invalid> wrote: > >>> > >>> Hi all, > >>> > >>> I'd like to start a vote on KIP-914 for updating the Kafka Streams join > >>> processors to use proper timestamp-based semantics in applications with > >>> versioned stores: > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores > >>> > >>> To avoid compatibility concerns, I'd like to include the changes from > >>> this > >>> KIP together with KIP-889 > >>> < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores > > > >>> (for introducing versioned stores) in the upcoming 3.5 release. I will > >>> close the vote on the 3.5 KIP deadline, March 22, if there are no > >>> objections before then. > >>> > >>> Thanks, > >>> Victoria >