Hi Matthias, I've filed KAFKA-5315 (https://issues.apache.org/jira/browse/KAFKA-5315).
Thanks, Mathieu On Fri, May 19, 2017 at 3:07 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi Mathieu, > > Thanks for reporting this! > > This is definitely a bug and it must get fixed for at-last-once > processing, too. Exactly-once is not required to avoid the bug. > > Can you open a Jira? I think, I know already how to fix it. > > > -Matthias > > > On 5/19/17 8:53 AM, Mathieu Fenniak wrote: >> Whoops, I said I'd put the specific exception at the bottom of the >> e-mail. It probably isn't the important part of this thread, but >> might suggest when this situation can occur. Also of note, this is >> occurring on Kafka Streams 0.10.2.1. >> >> >> 20:56:07.061 [StreamThread-3] ERROR o.a.k.s.p.internals.StreamThread - >> stream-thread [StreamThread-3] Failed to commit StreamTask 0_4 state: >> org.apache.kafka.streams.errors.StreamsException: task [0_4] exception >> caught when producing >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121) >> at >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129) >> at >> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) >> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to >> update metadata after 60000 ms. >> >> On Fri, May 19, 2017 at 9:47 AM, Mathieu Fenniak >> <mathieu.fenn...@replicon.com> wrote: >>> Hi Kafka devs, >>> >>> This morning I observed a specific Kafka Streams aggregation that >>> ended up with an incorrect computed output after a Kafka Streams >>> thread crashed with an unhandled exception. >>> >>> The topology is pretty simple -- a single KTable source, group by a >>> field in the value, aggregate that adds up another field, output to a >>> topic. >>> >>> Here's the sequence of events that appears to have occurred: >>> >>> 1. A new record (record A) is received by the source KTable, and put >>> in the KTable RocksDB state store. >>> >>> 2. While processing record A, an exception happens preventing >>> producing to Kafka. (specific exception at end of e-mail). >>> >>> 3. The stream thread throws an unhandled exception and stops. >>> >>> 4. The state stores are closed and flushed. Record A is now in the >>> local state store. >>> >>> 5. The consumer group rebalances. >>> >>> 6. A different thread, in the same process, on the same host, picks up the >>> task. >>> >>> 7. New thread initializes its state store for the KTable, but it's on >>> the same host as the original thread, so it still contains the k/v for >>> record A. >>> >>> 8. New thread resumes consuming at the last committed offset, which is >>> before record A. >>> >>> 9. When processing record A, the new thread reads the value that was >>> written to the state store in step #1 by record A's key. >>> >>> 10. The repartition map receives a Change with both an oldValue and a >>> newValue, and forwards a Change(null, v) and Change(v, null) >>> >>> 11. The aggregation ends up both subtracting and adding the value of >>> record A, resulting in an incorrect output. >>> >>> As a result of this sequence, my aggregate output went from a value of >>> 0, to negative (subtracting record A), to 0. And stayed there. >>> >>> Does this seem like a feasible series of events? Is this a bug in KS, >>> or, is it behavior that maybe can't be improved without exactly-once? >>> I'd think the best behavior would be for writes to the RocksDB state >>> store to be transactional and only commit when the producer commits, >>> but, there's a lot of overhead involved in that. >>> >>> Mathieu >