[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171249#comment-16171249
 ] 

Per Steffensen commented on KAFKA-5716:
---------------------------------------

FWIW

bq. WorkerSourceTask doesn't have synchronization around at least one call to 
commitSourceTask (which invokes the SourceTask.commit()) method so you may call 
this while the main thread is invoking some other method on the connector.

{{commitSourceTask}} is only called by {{commitOffsets}}, which is only 
(intended to be?) called by {{SourceTaskOffsetCommitter}}, so no need to have 
synchronization on that, to prevent it from being called several times 
concurrently. {{commitOffsets}} is also called by main-thread on 
{{execute}}-finish, but besides that. I guess, with the way there are 
synchronization in {{commitOffsets}} it works nicely being called by 
{{SourceTaskOffsetCommitter}}, at the same time as the main-thread is calling 
other methods on the connector. It provides the semantics that: 
{{task.commit()}} is called when offsets has been written and flushed, and the 
offsets that has been written and flushed will always include everything from a 
{{task.poll}} (and all previous polls), BUT NOT NECESSARILY THE LAST (FEW) 
POLL(S). It is all or nothing per poll. The only problem I see is that it is 
does not necessarily contain offsets from records from the last (few) poll(s). 
That is what this KAFKA-5716 is about.

bq. Correcting the code would be tough

I think it is a matter of exposing to {{task.commit}} which polled records have 
had their offset-changes included in the offsets-write and which have not. 
Guess it could be done by just giving "the last record that has been included 
in the offset-write", indirectly indicating the last poll that has been 
included. Please see the attached patch. And then change {{SourceTask}} to:
{code}
public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws 
InterruptedException {
    commit();
}
public void commit() throws InterruptedException {
    // This space intentionally left blank.
}
{code}
And some updated JavaDoc - maybe including deprecation of {{commit()}}, because 
it is ambiguous.

bq. I'd propose a different solution: let's just remove the method

Uhhh, please do not. Then the {{SourceTask}}-implementation will have 
absolutely no clue when offsets has been written and flushed. {{commitRecord}} 
will not help, as it is completely independent when Kafka acknowledges outgoing 
records (triggering {{commitRecord}}) and when record-offsets are written to 
offset-storage (triggering {{commit}})

bq. Given its current state, it seems unlikely anyone is actually using this 
functionality anyway

I am! And couldn't live without it :-) I cannot afford to acknowledge data 
going into my source-connector before the corresponding outgoing records AND 
their offset-changes has been written, flushed and acknowledged. Today I 
pretend that everything polled has been written to offset-storage when 
{{task.commit()}} is called, even though that is not always entirely true, but 
it is close. But not knowing anything about when offsets have been written to 
offset-storage would definitely leave me in the blind.

bq. If someone did want this functionality, we likely should just add a new 
{{commit(Map<SourcePartition, SourceOffset>)}} variant

That is a good alternative to the {{commit(SourceRecord 
lastPolledRecordWithOffsetsWritten)}} variant I suggested.

bq. It'd be interesting to know if the method is overridden in any connectors 
that we know about in the wild.

I hope it will often be the case, because that is the only clue you have about 
when offsets have been written to offset-storage, so if you are actually using 
connect-offsets (and you probably are, since you are using connect), you would 
probably have an incorrect system if you do not take advantage of {{commit()}}. 
You cannot use {{commitRecord}} for that.

bq. If we just wanted a short term fix, we could definitely update the javadoc 
to make it clear what's actually happening and that this probably isn't what 
you want.

That would be easy :-) But would not leave the {{SourceTask}}-implementor in 
any better position. It will still be undefined which polled records had their 
offsets written and which had not. This small change will help
{code}
public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws 
InterruptedException {
    commit();
}
public void commit() throws InterruptedException {
    // This space intentionally left blank.
}
{code}
Or the {{commit(Map<SourcePartition, SourceOffset>)}} variant, you suggest.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to