Mathieu Fenniak created KAFKA-5315:
--------------------------------------

             Summary: Streams exception w/ partially processed record corrupts 
state store
                 Key: KAFKA-5315
                 URL: https://issues.apache.org/jira/browse/KAFKA-5315
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.1
            Reporter: Mathieu Fenniak


When processing a topic record in a Kafka Streams KTable, the record is 
inserted into the state store before the being forwarded to downstream 
processors, and may persist in the state store even if downstream processing 
fails due to an exception.  The persisted state store record may later affect 
any attempt to restart processing after the exception.

Specific example series of events in a simple topology: a single KTable source, 
group by a field in the value, aggregate that adds up another field, output to 
a topic --

1. A new record (record A) is received by the source KTable, and put in the 
KTable RocksDB state store.
2. While processing record A, an exception happens preventing producing to 
Kafka. (eg, a TimeoutException Failed to
update metadata after 60000 ms).
3. The stream thread throws an unhandled exception and stops.
4. The state stores are closed and flushed.  Record A is now in the local state 
store.
5. The consumer group rebalances.
6. A different thread, in the same process, on the same host, picks up the task.
7. New thread initializes its state store for the KTable, but it's on the same 
host as the original thread, so it still contains the k/v for record A.
8. New thread resumes consuming at the last committed offset, which is before 
record A.
9. When processing record A, the new thread reads the value that was written to 
the state store in step #1 by record A's key.
10. The repartition map receives a Change with both an oldValue and a
newValue, and forwards a Change(null, v) and Change(v, null)
11. The aggregation ends up both subtracting and adding the value of record A, 
resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to