Lets use X for the the point in time where commit() is called. Lets use Rs(X) for the recorders returned by poll()s at time X. At time X, it is not necessarily true that all records in Rs(X) have been sent to Kafka (and acknowledged) and had their offsets flushed to offset-store.

Example
* Time X-1: poll() is called and one records R is returned
* Time X: commit() is called. There is no guarantee that the data in R has been sent/acknowledged to/by Kafka, nor that the offsets in R has been flushed to offset-store (it is likely, though).

Due to synchronization necessary, it is probably hard to make that guarantee, without reducing throughput significantly. But it is feasible to make the change that commit() is given (via argument) a list/collection of the records for which it is a guarantee. Thats what my current fix does (see PR).

On 16/10/2018 19.33, Ryanne Dolan wrote:
Steff,

> Guess people have used it, assuming that all records that have been polled > at the time of callback to "commit", have also had their offsets committed. > But that is not true.

(excerpt from KIP)

The documentation for SourceTask.commit() reads:

> Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This > method should block until the commit is complete.

I'm confused by these seemingly contradictory statements. My assumption (as you say) is that all records returned by poll() will have been committed before commit() is invoked by the framework. Is that not the case?

Ryanne

On Wed, Oct 10, 2018 at 8:50 AM Per Steffensen <perst...@gmail.com <mailto:perst...@gmail.com>> wrote:

    Please help make the proposed changes in KIP-381 become reality.
    Please
    comment.

    KIP:
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback

    JIRA: https://issues.apache.org/jira/browse/KAFKA-5716

    PR: https://github.com/apache/kafka/pull/3872

    Thanks!



Reply via email to