[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212416#comment-16212416
 ] 

Per Steffensen commented on KAFKA-5716:
---------------------------------------

Having a hard time to figure out exactly what you like me to do and not. 
Therefore I just did changes that I find reasonable. Please comment!

I was fairly eager about doing related stuff, like deprecating, renaming 
related methods, being consistent on related stuff etc. Please let me know if I 
was too eager.

Only "public" changes, as I see it is
* SourceTask: offsetsFlushedAndAcknowledged and recordSentAndAcknowledged 
added, but commit and commitRecord still exists with same out-of-the-box 
semantics
* WorkerConfig: OFFSET_COMMIT_XXX constants renamed to OFFSET_FLUSH_XXX

Probably ought to write more here, but I am in a hurry right now.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Assignee: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to