Hello David, Thanks for reporting your observations. I agree with you that we should improve on stream task committing mechanism. In fact, there is already a JIRA opened but for another motivation: https://issues.apache.org/jira/browse/KAFKA-5510
Feel free to take on this JIRA and submit a PR. Guozhang On Thu, May 10, 2018 at 1:33 PM, David Chu <david....@appdynamics.com> wrote: > I have a simple Kafka Streams topology of the format “Source Topic -> > Processor -> Sink Topic” that is configured to use exactly-once processing > guarantees. The Processor performs a reduce operation on the incoming > messages and stores the results to a key-value state store (with logging > enabled). The contents of the key-value state store are forwarded to the > sink topic as part of a Punctuator implementation which is scheduled to run > every 10 seconds (using wall clock time). It’s also important to note that > the commit interval for the stream application is set to 1 second. The > problem I’m seeing is that if new records aren’t being consumed off the > source topic, then the Producer#commitTransaction() method is never invoked > inside the StreamTask#commitOffsets() method since it’s blocked by the "if > (commitOffsetNeeded)” condition - the commitOffsetNeeded field always > appears to be false since no new records are being consumed. This becomes > problematic in my case because the punctuator can be flushing items from > the state store to the sink topic even when no new records are being > consumed from the source topic. However, because the transaction is not > being committed these messages can’t be consumed by downstream consumers > configured with an isolation level of read_committed. Below is a more > detailed description of the steps which are taking place. > > 1. The source topic only has a few messages on it and no new messages are > arriving. > 2. All the messages are consumed by the Processor and the results are > placed into the state store before the first commit occurs at the 1 second > mark. > 3. The first commit occurs at the 1 second mark causing the > StreamTask#commitOffsets() method to get invoked. Since messages were > processed as part of step #2 the StreamTask#commitOffsetNeeded field is set > to true which results in the Producer#commitTransaction() method being > called on line 358. However, at this point in time, all the results are > still in the state store and have not yet been sent to the sink topic. > 4. The Punctuator is called at the 10 second mark and it forwards all the > entries in the state store to the sink topic. > 5. The next commit interval elapses, however, because no new incoming > events have been processed, the StreamTask#commitOffsetNeeded field is now > set to false so when the StreamTask#commitOffsets() method is invoked it > exits without doing any work. As a result, the > Producer#commitTransaction() method is never called. > 6. Since the Producer#commitTransaction() method is never called, the > messages which were placed onto the sink topic by the Punctuator in step #4 > can never be seen by consumers configured with an isolation level of > read_committed. > 7. Furthermore, calling the ProcessorContext#commit() method within the > Punctuator#punctuate() method does not seem to help the situation since the > StreamTask#commitOffsets() method does not take into consideration the > value of the StreamTask#commitRequested field. > > So my question is, would it be beneficial to update the logic in the > StreamTask#commitOffsets() method so that if ProcessorContext#commit() has > been called and exactly-once processing has been enabled that the > Producer#commitTransaction() method is still called even if no records were > consumed off the topic? This would help to handle the case where the > punctuate call itself is producing messages to a downstream topic. > > Thanks, > David -- -- Guozhang