Hi Matthias,

I've filed KAFKA-5315 (https://issues.apache.org/jira/browse/KAFKA-5315).

Thanks,

Mathieu


On Fri, May 19, 2017 at 3:07 PM, Matthias J. Sax <matth...@confluent.io> wrote:
> Hi Mathieu,
>
> Thanks for reporting this!
>
> This is definitely a bug and it must get fixed for at-last-once
> processing, too. Exactly-once is not required to avoid the bug.
>
> Can you open a Jira? I think, I know already how to fix it.
>
>
> -Matthias
>
>
> On 5/19/17 8:53 AM, Mathieu Fenniak wrote:
>> Whoops, I said I'd put the specific exception at the bottom of the
>> e-mail.  It probably isn't the important part of this thread, but
>> might suggest when this situation can occur.  Also of note, this is
>> occurring on Kafka Streams 0.10.2.1.
>>
>>
>> 20:56:07.061 [StreamThread-3] ERROR o.a.k.s.p.internals.StreamThread -
>> stream-thread [StreamThread-3] Failed to commit StreamTask 0_4 state:
>> org.apache.kafka.streams.errors.StreamsException: task [0_4] exception
>> caught when producing
>>         at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:121)
>>         at 
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:129)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>>         at 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
>> update metadata after 60000 ms.
>>
>> On Fri, May 19, 2017 at 9:47 AM, Mathieu Fenniak
>> <mathieu.fenn...@replicon.com> wrote:
>>> Hi Kafka devs,
>>>
>>> This morning I observed a specific Kafka Streams aggregation that
>>> ended up with an incorrect computed output after a Kafka Streams
>>> thread crashed with an unhandled exception.
>>>
>>> The topology is pretty simple -- a single KTable source, group by a
>>> field in the value, aggregate that adds up another field, output to a
>>> topic.
>>>
>>> Here's the sequence of events that appears to have occurred:
>>>
>>> 1. A new record (record A) is received by the source KTable, and put
>>> in the KTable RocksDB state store.
>>>
>>> 2. While processing record A, an exception happens preventing
>>> producing to Kafka. (specific exception at end of e-mail).
>>>
>>> 3. The stream thread throws an unhandled exception and stops.
>>>
>>> 4. The state stores are closed and flushed.  Record A is now in the
>>> local state store.
>>>
>>> 5. The consumer group rebalances.
>>>
>>> 6. A different thread, in the same process, on the same host, picks up the 
>>> task.
>>>
>>> 7. New thread initializes its state store for the KTable, but it's on
>>> the same host as the original thread, so it still contains the k/v for
>>> record A.
>>>
>>> 8. New thread resumes consuming at the last committed offset, which is
>>> before record A.
>>>
>>> 9. When processing record A, the new thread reads the value that was
>>> written to the state store in step #1 by record A's key.
>>>
>>> 10. The repartition map receives a Change with both an oldValue and a
>>> newValue, and forwards a Change(null, v) and Change(v, null)
>>>
>>> 11. The aggregation ends up both subtracting and adding the value of
>>> record A, resulting in an incorrect output.
>>>
>>> As a result of this sequence, my aggregate output went from a value of
>>> 0, to negative (subtracting record A), to 0.  And stayed there.
>>>
>>> Does this seem like a feasible series of events?  Is this a bug in KS,
>>> or, is it behavior that maybe can't be improved without exactly-once?
>>> I'd think the best behavior would be for writes to the RocksDB state
>>> store to be transactional and only commit when the producer commits,
>>> but, there's a lot of overhead involved in that.
>>>
>>> Mathieu
>

Reply via email to