Hi Kafka devs, This morning I observed a specific Kafka Streams aggregation that ended up with an incorrect computed output after a Kafka Streams thread crashed with an unhandled exception.
The topology is pretty simple -- a single KTable source, group by a field in the value, aggregate that adds up another field, output to a topic. Here's the sequence of events that appears to have occurred: 1. A new record (record A) is received by the source KTable, and put in the KTable RocksDB state store. 2. While processing record A, an exception happens preventing producing to Kafka. (specific exception at end of e-mail). 3. The stream thread throws an unhandled exception and stops. 4. The state stores are closed and flushed. Record A is now in the local state store. 5. The consumer group rebalances. 6. A different thread, in the same process, on the same host, picks up the task. 7. New thread initializes its state store for the KTable, but it's on the same host as the original thread, so it still contains the k/v for record A. 8. New thread resumes consuming at the last committed offset, which is before record A. 9. When processing record A, the new thread reads the value that was written to the state store in step #1 by record A's key. 10. The repartition map receives a Change with both an oldValue and a newValue, and forwards a Change(null, v) and Change(v, null) 11. The aggregation ends up both subtracting and adding the value of record A, resulting in an incorrect output. As a result of this sequence, my aggregate output went from a value of 0, to negative (subtracting record A), to 0. And stayed there. Does this seem like a feasible series of events? Is this a bug in KS, or, is it behavior that maybe can't be improved without exactly-once? I'd think the best behavior would be for writes to the RocksDB state store to be transactional and only commit when the producer commits, but, there's a lot of overhead involved in that. Mathieu