[ 
https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306323#comment-17306323
 ] 

Christian Maus commented on KAFKA-12508:
----------------------------------------

I'm wondering if emit-on-change tables can be safely used with compacted input 
topics and at-least-once semantics.

Given the following records in the input topic:
timestamp, key, value

1, 1, 1
2, 1, 1
3, 1, 1


The records with ts 1 and two are read, an exception occurs, causing a restart.

During restart of the consuming process, log compaction removes the records 
with ts 1 and two, leaving only the record with ts 3 in the input topic.

When the topic is consumed, we will only see a record with ts 3, which is 
greater than the greatest ts seen for the key.
As the value is the same as stored in the table, the output will be suppressed.

Is there a flaw in my thinking or is this a plausible scenario?

> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-12508
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12508
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0, 2.6.1
>            Reporter: Nico Habermann
>            Assignee: Bruno Cadonna
>            Priority: Blocker
>             Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
>  added emit-on-change semantics to KTables that suppress updates for 
> duplicate values.
> However, this may cause data loss in at_least_once topologies when records 
> are retried from the last commit due to an error / restart / etc.
>  
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>  
>  # Record A gets read
>  # Record A is stored in the table
>  # The update for record A is forwarded through the topology
>  # Map() throws (or alternatively, any restart while the forwarded update was 
> still being processed and not yet produced to the output topic)
>  # The stream is restarted and "retries" from the last commit
>  # Record A gets read again
>  # The table will discard the update for record A because
>  ## The value is the same
>  ## The timestamp is the same
>  # Eventually the stream will commit
>  # There is absolutely no output for Record A even though we're running in 
> at_least_once
>  
> This behaviour does not seem intentional. [The emit-on-change logic 
> explicitly forwards records that have the same value and an older 
> timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
> This logic should probably be changed to also forward updates that have an 
> older *or equal* timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to