[ https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308063#comment-17308063 ]
John Roesler commented on KAFKA-12508: -------------------------------------- Thanks [~cmaus] , I think you're right. Since this feature is purely an optimization, and since it doesn't seem like we're going to be able to patch it up to avoid these data loss conditions, I'm just going to disable it entirely. I'm confident that we can re-introduce it later and avoid these edge cases, but for now I'd like to err on the side of correctness. > Emit-on-change tables may lose updates on error or restart in at_least_once > --------------------------------------------------------------------------- > > Key: KAFKA-12508 > URL: https://issues.apache.org/jira/browse/KAFKA-12508 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0, 2.6.1 > Reporter: Nico Habermann > Assignee: John Roesler > Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] > added emit-on-change semantics to KTables that suppress updates for > duplicate values. > However, this may cause data loss in at_least_once topologies when records > are retried from the last commit due to an error / restart / etc. > > Consider the following example: > {code:java} > streams.table(source, materialized) > .toStream() > .map(mayThrow()) > .to(output){code} > > # Record A gets read > # Record A is stored in the table > # The update for record A is forwarded through the topology > # Map() throws (or alternatively, any restart while the forwarded update was > still being processed and not yet produced to the output topic) > # The stream is restarted and "retries" from the last commit > # Record A gets read again > # The table will discard the update for record A because > ## The value is the same > ## The timestamp is the same > # Eventually the stream will commit > # There is absolutely no output for Record A even though we're running in > at_least_once > > This behaviour does not seem intentional. [The emit-on-change logic > explicitly forwards records that have the same value and an older > timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] > This logic should probably be changed to also forward updates that have an > older *or equal* timestamp. -- This message was sent by Atlassian Jira (v8.3.4#803005)