Thanks for reporting this @David! @Guozhang: I actually think this is two different issues. This is also exposed in a current PR: https://github.com/apache/kafka/pull/4912/files#r188179256
I created https://issues.apache.org/jira/browse/KAFKA-6906 for this issue. -Matthias On 5/11/18 10:39 AM, Guozhang Wang wrote: > Hello David, > > Thanks for reporting your observations. I agree with you that we should > improve on stream task committing mechanism. In fact, there is already a > JIRA opened but for another motivation: > https://issues.apache.org/jira/browse/KAFKA-5510 > > > Feel free to take on this JIRA and submit a PR. > > Guozhang > > > On Thu, May 10, 2018 at 1:33 PM, David Chu <david....@appdynamics.com> > wrote: > >> I have a simple Kafka Streams topology of the format “Source Topic -> >> Processor -> Sink Topic” that is configured to use exactly-once processing >> guarantees. The Processor performs a reduce operation on the incoming >> messages and stores the results to a key-value state store (with logging >> enabled). The contents of the key-value state store are forwarded to the >> sink topic as part of a Punctuator implementation which is scheduled to run >> every 10 seconds (using wall clock time). It’s also important to note that >> the commit interval for the stream application is set to 1 second. The >> problem I’m seeing is that if new records aren’t being consumed off the >> source topic, then the Producer#commitTransaction() method is never invoked >> inside the StreamTask#commitOffsets() method since it’s blocked by the "if >> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always >> appears to be false since no new records are being consumed. This becomes >> problematic in my case because the punctuator can be flushing items from >> the state store to the sink topic even when no new records are being >> consumed from the source topic. However, because the transaction is not >> being committed these messages can’t be consumed by downstream consumers >> configured with an isolation level of read_committed. Below is a more >> detailed description of the steps which are taking place. >> >> 1. The source topic only has a few messages on it and no new messages are >> arriving. >> 2. All the messages are consumed by the Processor and the results are >> placed into the state store before the first commit occurs at the 1 second >> mark. >> 3. The first commit occurs at the 1 second mark causing the >> StreamTask#commitOffsets() method to get invoked. Since messages were >> processed as part of step #2 the StreamTask#commitOffsetNeeded field is set >> to true which results in the Producer#commitTransaction() method being >> called on line 358. However, at this point in time, all the results are >> still in the state store and have not yet been sent to the sink topic. >> 4. The Punctuator is called at the 10 second mark and it forwards all the >> entries in the state store to the sink topic. >> 5. The next commit interval elapses, however, because no new incoming >> events have been processed, the StreamTask#commitOffsetNeeded field is now >> set to false so when the StreamTask#commitOffsets() method is invoked it >> exits without doing any work. As a result, the >> Producer#commitTransaction() method is never called. >> 6. Since the Producer#commitTransaction() method is never called, the >> messages which were placed onto the sink topic by the Punctuator in step #4 >> can never be seen by consumers configured with an isolation level of >> read_committed. >> 7. Furthermore, calling the ProcessorContext#commit() method within the >> Punctuator#punctuate() method does not seem to help the situation since the >> StreamTask#commitOffsets() method does not take into consideration the >> value of the StreamTask#commitRequested field. >> >> So my question is, would it be beneficial to update the logic in the >> StreamTask#commitOffsets() method so that if ProcessorContext#commit() has >> been called and exactly-once processing has been enabled that the >> Producer#commitTransaction() method is still called even if no records were >> consumed off the topic? This would help to handle the case where the >> punctuate call itself is producing messages to a downstream topic. >> >> Thanks, >> David > > > >
signature.asc
Description: OpenPGP digital signature