[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162824#comment-16162824
 ] 

Per Steffensen commented on KAFKA-5716:
---------------------------------------

Sorry for the late response. I have been busy (writing source-connectors :-) )

{quote}
{code}
+     * will be written before after the return of from the call to this method.
{code}
Can you rephrase the above ?
{quote}
"No additional offsets will be written or flushed before returning from the 
call to {{offsetsCommitted}}."

Do not know if thats better :-)

bq. Consider putting this as pull request (off trunk).

Ok, maybe I will later. I thought it probably would be a little too early. I 
guess, when submitting a pull-request, you actually have something you would 
like pulled. The patch I did is definitely not ready for inclusion in the 
actual code-base - at least not before having discussed thoroughly with you 
guys. And the final code to be pulled, should include a test testing that 
things work as expected - IMHO.
* Do you also use pull-request for early-not-ready-to-be-pulled code-submission?
* Do you have a "how to contribute" guide somewhere, that I should read, in 
order to be able to "follow procedure" correctly?

bq. Have you tried replicating this with a test case?

No unfortunately not. As I stated, it is by code-inspection only, that I put 
forward this issue.

bq. It might not be straightforward considering it is dependent upon the timing 
of the {{WorkerSourceTask}} implementation, but is seems like it should be 
possible to replicate with a mock offset writer and consumer. Not only will 
this help prove your conjecture, but it also will help verify that we can 
correctly fix it.

I believe it will be fairly easy to create a test that shows the problem (if I 
am right about its existence). But yes, you probably have to mock an 
{{OffsetStorageWriter}} with a slow {{doFlush}}-method or an 
{{OffsetBackingStore}} with a slow {{set}}-method.
If you can point me in the direction of some existing tests testing stuff like 
this, I may get the time to help write such a test. I am not sure how you 
usually write tests - the style, mocking, unit vs integration etc., so it would 
be nice with a pointer.

bq. Also, please be aware that all changes to the Connect API must be binary 
compatible with previous releases so that existing {{SourceTask}} 
implementations need not be changed or even recompiled to run on newer versions 
of the framework

Yeah, I thought so. At least such a change should go through a long deprecated 
period, and only actually make the incompatible change in a new major release. 
The renaming etc in my patch was just a suggestion. Lets forget about it for 
now.

bq. FWIW, I personally think the names of {{commitRecord(...)}}, {{commit()}}, 
and even a new {{commitOffsets(SourceRecord)}} on the {{SourceTask}} actually 
do make sense since they reflect the action that the source task should take.

IMHO FWIW, I do not agree. At least it depends on the source-system of the 
concrete {{SourceTask}}. THE NAMING IS NOT VERY IMPORTANT TO ME, so lets just 
forget about it. But because we are discussing it now, I will just elaborate on 
my "mental model" leading to my opinion on naming.

{{SourceTask.commitRecord(R)}} is a notification from Kafka-connect, that the 
outgoing message wrapped in a given {{SourceRecord}} R has been forwarded to 
and acknowledged by the receiving Kafka output-topic. The source-task typically 
produced this {{SourceRecord}} R, from some data D received/pulled from the 
source-system. What the source-task should typically do on 
{{SourceTask.commitRecord(R)}} is to ACKNOWLEDGE the reception, handling and 
forwarding of the input-data D. Such an "acknowledgement" may or may not 
include something that can be referred to as "commit".

Traditionally "commit" is used against output-systems, while "acknowledge" is 
used against input-systems. For a source-connector, Kafka-connect is handling 
the output-side, and therefore all "committing" against the output-side, while 
the source-task is handling the input-side and will have to "acknowledge" 
against the input-system. I believe the most common implementation in 
{{SourceTask.commitRecord}} will be "acknowledging" input-data received from 
the source-system.

E.g. consider a source-connector with source-tasks consuming data from a 
Kafka-topic. Yes, a source-connector where the source-system is Kafka itself - 
after all according to my "mental model" a source-connector is about making a 
none-Kafka data-source map into and "feel like" a Kafka data-source (streamed, 
partitioned, etc). Imaging such a source-connector with a Kafka-topic as 
"source". The source-connector would probably create tasks for each partition 
in the source-topic. The task would probably do something a-la:
* Receive a message (IM) from the designated partition of the source-topic via 
Kafka-consumer C
* Create an outgoing message (OM) containing the data from IM
* Wrap OM in a {{SourceRecord}} R, for the outgoing Kafka-topic
* Return OM from {{SourceTask.poll}}
* When Kafka-connect calls {{SourceTask.commitRecord(R)}} the source-task would 
call {{C.acknowledge(IM.offset())}} to acknowledge the reception, handling and 
successful forward of the data in IM.

IMHO a source-task TYPICALLY does acknowledging - not committing. So I do not 
agree that {{commitRecord(...)}} reflect the action that the source task should 
take (same goes for {{commit()}}). If you want to name them, so that they 
reflect the action that the source-task should take, they should be named 
{{acknowledgeSomething}}. But because acknowledging is not the only thing you 
can think of doing in this phase as a source-task, and because what you want to 
do really depends on the source-system, I would rather name the methods after 
what has just happened in Kafka-connect (committed the outgoing message), than 
after what the source-task should (may want to) do - you really cannot tell the 
source-task what it ought to do, because it depends on its source-system and 
the way it works, so maybe it is better just making clear which event just 
happened in Kafka-connect ({{recordCommitted(R)}} and 
{{offsetsCommitted(...)}}), and the source-task will have to figure out what it 
wants to do.

BUT AGAIN, THE NAMING IS NOT VERY IMPORTANT TO ME. And the above is all related 
to my "mental model" of what a source-connector is - and I may very well be 
off, compared to the thoughts others have. Please do not take the above as any 
kind of insult. It is just me explaining my thoughts, and it is very likely 
that I am just stupid here.

bq. ... but we deprecate it and can add a new method such as ...

Completely agree with everything you stated in that paragraph.

(y)

bq. Thanks again for all your work on this!

No problem. It has not been much work (yet). I am the one thanking you for 
taking the issue seriously enough to read and respond thoroughly.

Lets me know if I should try to find time to write a test and/or create a 
pull-request (not ready for pull, yet)

"I didn't have time to write a short letter, so I wrote a long one instead." - 
Mark Twain
Sorry!

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to