Thanks for reporting this @David!

@Guozhang: I actually think this is two different issues. This is also
exposed in a current PR:

I created for this issue.


On 5/11/18 10:39 AM, Guozhang Wang wrote:
> Hello David,
> Thanks for reporting your observations. I agree with you that we should
> improve on stream task committing mechanism. In fact, there is already a
> JIRA opened but for another motivation:
> Feel free to take on this JIRA and submit a PR.
> Guozhang
> On Thu, May 10, 2018 at 1:33 PM, David Chu <>
> wrote:
>> I have a simple Kafka Streams topology of the format “Source Topic ->
>> Processor -> Sink Topic” that is configured to use exactly-once processing
>> guarantees.  The Processor performs a reduce operation on the incoming
>> messages and stores the results to a key-value state store (with logging
>> enabled).  The contents of the key-value state store are forwarded to the
>> sink topic as part of a Punctuator implementation which is scheduled to run
>> every 10 seconds (using wall clock time).  It’s also important to note that
>> the commit interval for the stream application is set to 1 second.  The
>> problem I’m seeing is that if new records aren’t being consumed off the
>> source topic, then the Producer#commitTransaction() method is never invoked
>> inside the StreamTask#commitOffsets() method since it’s blocked by the "if
>> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always
>> appears to be false since no new records are being consumed.  This becomes
>> problematic in my case because the punctuator can be flushing items from
>> the state store to the sink topic even when no new records are being
>> consumed from the source topic.  However, because the transaction is not
>> being committed these messages can’t be consumed by downstream consumers
>> configured with an isolation level of read_committed.  Below is a more
>> detailed description of the steps which are taking place.
>> 1. The source topic only has a few messages on it and no new messages are
>> arriving.
>> 2. All the messages are consumed by the Processor and the results are
>> placed into the state store before the first commit occurs at the 1 second
>> mark.
>> 3. The first commit occurs at the 1 second mark causing the
>> StreamTask#commitOffsets() method to get invoked.  Since messages were
>> processed as part of step #2 the StreamTask#commitOffsetNeeded field is set
>> to true which results in the Producer#commitTransaction() method being
>> called on line 358.  However, at this point in time, all the results are
>> still in the state store and have not yet been sent to the sink topic.
>> 4. The Punctuator is called at the 10 second mark and it forwards all the
>> entries in the state store to the sink topic.
>> 5. The next commit interval elapses, however, because no new incoming
>> events have been processed, the StreamTask#commitOffsetNeeded field is now
>> set to false so when the StreamTask#commitOffsets() method is invoked it
>> exits without doing any work.  As a result, the
>> Producer#commitTransaction() method is never called.
>> 6. Since the Producer#commitTransaction() method is never called, the
>> messages which were placed onto the sink topic by the Punctuator in step #4
>> can never be seen by consumers configured with an isolation level of
>> read_committed.
>> 7. Furthermore, calling the ProcessorContext#commit() method within the
>> Punctuator#punctuate() method does not seem to help the situation since the
>> StreamTask#commitOffsets() method does not take into consideration the
>> value of the StreamTask#commitRequested field.
>> So my question is, would it be beneficial to update the logic in the
>> StreamTask#commitOffsets() method so that if ProcessorContext#commit() has
>> been called and exactly-once processing has been enabled that the
>> Producer#commitTransaction() method is still called even if no records were
>> consumed off the topic?  This would help to handle the case where the
>> punctuate call itself is producing messages to a downstream topic.
>> Thanks,
>> David

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to