C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r728049488



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Ugh, sorry. Your initial point was very clear, although I really 
appreciate the detailed writeup here. It was an implementation snafu. I wanted 
to handle the case where `poll` produced no records, which meant invoking 
`updateCommittableOffsets` before the `if (toSend == null) continue;` section. 
Of course, that didn't actually address the original concern, which is that we 
may miss a chance to update offsets for records just-dispatched to the producer 
in `sendRecords`.
   
   I like the idea of placing `updateCommittableOffsets` right before the `if 
(shouldPause())` check, at the top of the loop; will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to