Thanks Matthias,

I left a comment on KAFKA-6906 proposing a solution which I think should be
sufficient for both.



Guozhang

On Tue, May 15, 2018 at 3:10 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for reporting this @David!
>
> @Guozhang: I actually think this is two different issues. This is also
> exposed in a current PR:
> https://github.com/apache/kafka/pull/4912/files#r188179256
>
> I created https://issues.apache.org/jira/browse/KAFKA-6906 for this issue.
>
>
> -Matthias
>
>
>
> On 5/11/18 10:39 AM, Guozhang Wang wrote:
> > Hello David,
> >
> > Thanks for reporting your observations. I agree with you that we should
> > improve on stream task committing mechanism. In fact, there is already a
> > JIRA opened but for another motivation:
> > https://issues.apache.org/jira/browse/KAFKA-5510
> >
> >
> > Feel free to take on this JIRA and submit a PR.
> >
> > Guozhang
> >
> >
> > On Thu, May 10, 2018 at 1:33 PM, David Chu <david....@appdynamics.com>
> > wrote:
> >
> >> I have a simple Kafka Streams topology of the format “Source Topic ->
> >> Processor -> Sink Topic” that is configured to use exactly-once
> processing
> >> guarantees.  The Processor performs a reduce operation on the incoming
> >> messages and stores the results to a key-value state store (with logging
> >> enabled).  The contents of the key-value state store are forwarded to
> the
> >> sink topic as part of a Punctuator implementation which is scheduled to
> run
> >> every 10 seconds (using wall clock time).  It’s also important to note
> that
> >> the commit interval for the stream application is set to 1 second.  The
> >> problem I’m seeing is that if new records aren’t being consumed off the
> >> source topic, then the Producer#commitTransaction() method is never
> invoked
> >> inside the StreamTask#commitOffsets() method since it’s blocked by the
> "if
> >> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always
> >> appears to be false since no new records are being consumed.  This
> becomes
> >> problematic in my case because the punctuator can be flushing items from
> >> the state store to the sink topic even when no new records are being
> >> consumed from the source topic.  However, because the transaction is not
> >> being committed these messages can’t be consumed by downstream consumers
> >> configured with an isolation level of read_committed.  Below is a more
> >> detailed description of the steps which are taking place.
> >>
> >> 1. The source topic only has a few messages on it and no new messages
> are
> >> arriving.
> >> 2. All the messages are consumed by the Processor and the results are
> >> placed into the state store before the first commit occurs at the 1
> second
> >> mark.
> >> 3. The first commit occurs at the 1 second mark causing the
> >> StreamTask#commitOffsets() method to get invoked.  Since messages were
> >> processed as part of step #2 the StreamTask#commitOffsetNeeded field is
> set
> >> to true which results in the Producer#commitTransaction() method being
> >> called on line 358.  However, at this point in time, all the results are
> >> still in the state store and have not yet been sent to the sink topic.
> >> 4. The Punctuator is called at the 10 second mark and it forwards all
> the
> >> entries in the state store to the sink topic.
> >> 5. The next commit interval elapses, however, because no new incoming
> >> events have been processed, the StreamTask#commitOffsetNeeded field is
> now
> >> set to false so when the StreamTask#commitOffsets() method is invoked it
> >> exits without doing any work.  As a result, the
> >> Producer#commitTransaction() method is never called.
> >> 6. Since the Producer#commitTransaction() method is never called, the
> >> messages which were placed onto the sink topic by the Punctuator in
> step #4
> >> can never be seen by consumers configured with an isolation level of
> >> read_committed.
> >> 7. Furthermore, calling the ProcessorContext#commit() method within the
> >> Punctuator#punctuate() method does not seem to help the situation since
> the
> >> StreamTask#commitOffsets() method does not take into consideration the
> >> value of the StreamTask#commitRequested field.
> >>
> >> So my question is, would it be beneficial to update the logic in the
> >> StreamTask#commitOffsets() method so that if ProcessorContext#commit()
> has
> >> been called and exactly-once processing has been enabled that the
> >> Producer#commitTransaction() method is still called even if no records
> were
> >> consumed off the topic?  This would help to handle the case where the
> >> punctuate call itself is producing messages to a downstream topic.
> >>
> >> Thanks,
> >> David
> >
> >
> >
> >
>
>


-- 
-- Guozhang

Reply via email to