[
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644987#comment-16644987
]
Per Steffensen commented on KAFKA-5716:
---------------------------------------
Created KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-381%3A+Connect%3A+Tell+about+records+that+had+their+offsets+flushed+in+callback
> Connect: When SourceTask.commit it is possible not everthing from
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
> Key: KAFKA-5716
> URL: https://issues.apache.org/jira/browse/KAFKA-5716
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Reporter: Per Steffensen
> Assignee: Per Steffensen
> Priority: Minor
> Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected
> recently. If so, I apologize. I found the "problem" by code-inspection alone,
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect
> will record offsets
> automatically. This hook is provided for systems that also need to store
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is
> "told" that everything returned from poll up until "now" has been sent/stored
> - both the outgoing messages and the associated connect-offsets. Looking at
> the implementation it also seems that this is what it tries to
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and
> commitOffsets respectively, hindering the task-thread to add to
> outstandingMessages and offsetWriter while committer-thread is marking what
> has to be flushed in the offsetWriter and waiting for outstandingMessages to
> be empty. This means that the offsets committed will be consistent with what
> has been sent out, but not necessarily what has been polled. At least I do
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks
> in and does its commiting, while hindering the task-thread adding the polled
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the
> records just polled from task.poll has been sent or corresponding
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect
> will record offsets
> automatically. This hook is provided for systems that also need to store
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would
> appreciate an explanation where my code-inspection is not correct, and why it
> works even though I cannot see it. I will not expect such an explanation,
> though.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)