If we send old and new value as two messages, this should work I guess? Victory could confirm. -- But not if we send old/new as a single message in case the new-key does not change?
-Matthias
On 4/11/23 5:25 AM, Lucas Brutschy wrote:
Hi,

No concerns at all, just a clarifying question from my side: for
detecting out-of-order records, I need both new and old timestamp, I
suppose I get it for the new record via timestamp extractor, can I not
get it the same way from the old record that is passed down to the
aggregation after KIP-904?

Thanks,
Lucas

On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax <mj...@apache.org> wrote:
Thanks.

One question: for the repartition topic format change, do we want to
re-use flag=2, or should we introduce flag=3, and determine when
compiling the DSL into the Topology if we want/need to include the
timestamp, and if not, use format version=2 to avoid unnecessary overhead?


-Matthias

On 4/10/23 5:47 PM, Victoria Xia wrote:
Hi everyone,

While wrapping up the implementation for KIP-914, I have discovered that
two more DSL processors require semantic updates in the presence of
versioned tables:

     - The table filter processor has an optimization to drop nulls if the
     previous filtered value is also null. When the upstream table is versioned,
     this optimization should be disabled in order to preserve proper version
     history in the presence of out-of-order data.
     - When performing an aggregation over a versioned table, only the latest
     value by timestamp (per key) should be included in the final aggregate
     value. This is not happening today in the presence of out-of-order data,
     due to the way that TableSourceNodes call `get(key)` in order to determine
     the "old value" which is to be removed from the aggregate as part of
     applying an update. To fix this, aggregations should ignore out-of-order
     records when aggregating versioned tables.
        - In order to implement this change, table aggregate processors need
        a way to determine whether a record is out-of-order or not. This
cannot be
        done by querying the source table value getter as that store belongs to 
a
        different subtopology (because a repartition occurs before
aggregation). As
        such, an additional timestamp must be included in the repartition topic.
        The 3.5 release already includes an update to the repartition
topic format
        (with upgrade implications properly handled) via KIP-904
        
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>,
        so making an additional change to the repartition topic format to add a
        timestamp comes at no additional cost to users.


I have updated the KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores>
itself with more detail about each of these changes. Please let me know if
there are any concerns. In the absence of dissent, I'd like to include
these changes along with the rest of KIP-914 in the 3.5 release.

Apologies for not noticing these additional semantics implications earlier,
Victoria

---------- Forwarded message ---------
From: Victoria Xia <victoria....@confluent.io>
Date: Wed, Mar 22, 2023 at 10:08 AM
Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
To: <dev@kafka.apache.org>


Thanks for voting, everyone! We have three binding yes votes with no
objections during four full days of voting. I will close the vote and mark
the KIP as accepted, right in time for the 3.5 release.

Thanks,
Victoria

On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna <cado...@apache.org> wrote:

+1 (binding)

Thanks Victoria!

Best,
Bruno

On 20.03.23 17:13, Matthias J. Sax wrote:
+1 (binding)

On 3/20/23 9:05 AM, Guozhang Wang wrote:
+1, thank you Victoria!

On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
<victoria....@confluent.io.invalid> wrote:
Hi all,

I'd like to start a vote on KIP-914 for updating the Kafka Streams join
processors to use proper timestamp-based semantics in applications with
versioned stores:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
To avoid compatibility concerns, I'd like to include the changes from
this
KIP together with KIP-889
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
(for introducing versioned stores) in the upcoming 3.5 release. I will
close the vote on the 3.5 KIP deadline, March 22, if there are no
objections before then.

Thanks,
Victoria

Reply via email to