[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176582#comment-16176582 ]
Randall Hauch commented on KAFKA-5716: -------------------------------------- {quote} What I am using it for can in some cases be described as "transfer data from some temporary storage (source-system) to Kafka, and when it has been safely transferred it is deleted from the source-system {quote} Yeah, this makes sense. The existing {{SourceTask.commitRecord(...)}} method is called after each source record has been written to Kafka -- can you use this to keep track of the offsets that have been written? Yeah, you have to do more bookkeeping, and you'd likely want to accumulate a bunch of offsets, but it may also be possible that if you know ahead of time the last offset in a particular file that you can react to those "last offsets" by removing the file. Note that this allows you to detect that the records *have been written to Kafka*. It is possible with this approach that if Connect were to crash before the most recent offsets have been flushed/committed, then upon restart Connect might want to start with offsets that are too old. But your connector would know that if the file(s) described by the last persisted offsets were removed, they would have been cleaned up once all of their records were written to Kafka. Could the connector just skip those files? {quote} and in other cases as just "advanced Kafka-to-Kafka ETL {quote} I wonder if you could do something similar for this case by relying upon {{commitRecord(...)}} to know when particular records have been written to Kafka. If so, I think the difference in behavior is whether you believe the offsets for *written records* or the *last committed offsets* are the "most correct" correct information. Recall that the last committed offsets may not represent the most recently *written* records, whereas the offsets from {{commitRecord(...)}} are definitely the offsets for the most recently-written records. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Per Steffensen > Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)