Mathieu Fenniak created KAFKA-5315:
--------------------------------------
Summary: Streams exception w/ partially processed record corrupts
state store
Key: KAFKA-5315
URL: https://issues.apache.org/jira/browse/KAFKA-5315
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.2.1
Reporter: Mathieu Fenniak
When processing a topic record in a Kafka Streams KTable, the record is
inserted into the state store before the being forwarded to downstream
processors, and may persist in the state store even if downstream processing
fails due to an exception. The persisted state store record may later affect
any attempt to restart processing after the exception.
Specific example series of events in a simple topology: a single KTable source,
group by a field in the value, aggregate that adds up another field, output to
a topic --
1. A new record (record A) is received by the source KTable, and put in the
KTable RocksDB state store.
2. While processing record A, an exception happens preventing producing to
Kafka. (eg, a TimeoutException Failed to
update metadata after 60000 ms).
3. The stream thread throws an unhandled exception and stops.
4. The state stores are closed and flushed. Record A is now in the local state
store.
5. The consumer group rebalances.
6. A different thread, in the same process, on the same host, picks up the task.
7. New thread initializes its state store for the KTable, but it's on the same
host as the original thread, so it still contains the k/v for record A.
8. New thread resumes consuming at the last committed offset, which is before
record A.
9. When processing record A, the new thread reads the value that was written to
the state store in step #1 by record A's key.
10. The repartition map receives a Change with both an oldValue and a
newValue, and forwards a Change(null, v) and Change(v, null)
11. The aggregation ends up both subtracting and adding the value of record A,
resulting in an incorrect & persistent output.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)