ncliang commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r625714846



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord<byte[], byte[]> record
         }
     }
 
+    private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> 
record) {
+        if (outstandingMessages.containsKey(record)) {
+            currentBatchFailed = true;
+            if (flushing) {
+                // flush thread may be waiting on the outstanding messages to 
clear
+                this.notifyAll();

Review comment:
       This implementation differs from the implementation of `recordSend` 
above when `outstandingMessages` contains records other than the current 
record. `recordSend` will wait until `outstandingMessages` is empty and 
`flushing` before `notifyAll` . Is this intentional because we've failed and 
can't clear the outstanding messages?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##########
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
     }
 
     private void commit(WorkerSourceTask workerTask) {
+        if (!workerTask.shouldCommitOffsets()) {

Review comment:
       I tend to agree with @kpatelatwork that the `shouldCommit` method is 
better encapsulated in `WorkerSourceTask` . There is a call site within 
`WorkerSourceTask` that could also benefit from the `shouldCommit` check, for 
instance. 
https://github.com/apache/kafka/blob/a63e5be4195e97e5b825b5912291144d2d0283a3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L266
   
   For the testing aspect, could you make the method visible to tests to do the 
assertion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to