Thanks Matthias, I left a comment on KAFKA-6906 proposing a solution which I think should be sufficient for both.
Guozhang On Tue, May 15, 2018 at 3:10 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for reporting this @David! > > @Guozhang: I actually think this is two different issues. This is also > exposed in a current PR: > https://github.com/apache/kafka/pull/4912/files#r188179256 > > I created https://issues.apache.org/jira/browse/KAFKA-6906 for this issue. > > > -Matthias > > > > On 5/11/18 10:39 AM, Guozhang Wang wrote: > > Hello David, > > > > Thanks for reporting your observations. I agree with you that we should > > improve on stream task committing mechanism. In fact, there is already a > > JIRA opened but for another motivation: > > https://issues.apache.org/jira/browse/KAFKA-5510 > > > > > > Feel free to take on this JIRA and submit a PR. > > > > Guozhang > > > > > > On Thu, May 10, 2018 at 1:33 PM, David Chu <david....@appdynamics.com> > > wrote: > > > >> I have a simple Kafka Streams topology of the format “Source Topic -> > >> Processor -> Sink Topic” that is configured to use exactly-once > processing > >> guarantees. The Processor performs a reduce operation on the incoming > >> messages and stores the results to a key-value state store (with logging > >> enabled). The contents of the key-value state store are forwarded to > the > >> sink topic as part of a Punctuator implementation which is scheduled to > run > >> every 10 seconds (using wall clock time). It’s also important to note > that > >> the commit interval for the stream application is set to 1 second. The > >> problem I’m seeing is that if new records aren’t being consumed off the > >> source topic, then the Producer#commitTransaction() method is never > invoked > >> inside the StreamTask#commitOffsets() method since it’s blocked by the > "if > >> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always > >> appears to be false since no new records are being consumed. This > becomes > >> problematic in my case because the punctuator can be flushing items from > >> the state store to the sink topic even when no new records are being > >> consumed from the source topic. However, because the transaction is not > >> being committed these messages can’t be consumed by downstream consumers > >> configured with an isolation level of read_committed. Below is a more > >> detailed description of the steps which are taking place. > >> > >> 1. The source topic only has a few messages on it and no new messages > are > >> arriving. > >> 2. All the messages are consumed by the Processor and the results are > >> placed into the state store before the first commit occurs at the 1 > second > >> mark. > >> 3. The first commit occurs at the 1 second mark causing the > >> StreamTask#commitOffsets() method to get invoked. Since messages were > >> processed as part of step #2 the StreamTask#commitOffsetNeeded field is > set > >> to true which results in the Producer#commitTransaction() method being > >> called on line 358. However, at this point in time, all the results are > >> still in the state store and have not yet been sent to the sink topic. > >> 4. The Punctuator is called at the 10 second mark and it forwards all > the > >> entries in the state store to the sink topic. > >> 5. The next commit interval elapses, however, because no new incoming > >> events have been processed, the StreamTask#commitOffsetNeeded field is > now > >> set to false so when the StreamTask#commitOffsets() method is invoked it > >> exits without doing any work. As a result, the > >> Producer#commitTransaction() method is never called. > >> 6. Since the Producer#commitTransaction() method is never called, the > >> messages which were placed onto the sink topic by the Punctuator in > step #4 > >> can never be seen by consumers configured with an isolation level of > >> read_committed. > >> 7. Furthermore, calling the ProcessorContext#commit() method within the > >> Punctuator#punctuate() method does not seem to help the situation since > the > >> StreamTask#commitOffsets() method does not take into consideration the > >> value of the StreamTask#commitRequested field. > >> > >> So my question is, would it be beneficial to update the logic in the > >> StreamTask#commitOffsets() method so that if ProcessorContext#commit() > has > >> been called and exactly-once processing has been enabled that the > >> Producer#commitTransaction() method is still called even if no records > were > >> consumed off the topic? This would help to handle the case where the > >> punctuate call itself is producing messages to a downstream topic. > >> > >> Thanks, > >> David > > > > > > > > > > -- -- Guozhang