TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104975



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
         return errantRecordFuture;
     }
 
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
+                                                   SourceRecord sourceRecord,
+                                                   Throwable error) {
+
+        markAsFailed();
+        context.sourceRecord(sourceRecord);
+        context.currentContext(stage, executingClass);
+        context.error(error);
+        errorHandlingMetrics.recordFailure();
+        Future<Void> errantRecordFuture = context.report();
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordError();
+            throw new ConnectException("Tolerance exceeded in error handler", 
error);

Review comment:
       I added some context to the string error message denoting it was a 
Source Worker. I am open to suggestions on how verbose this message should be.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -222,6 +222,13 @@ private void createWorkerTask() {
         createWorkerTask(TargetState.STARTED);
     }
 
+    private void createWorkerTaskWithErrorToleration() {

Review comment:
       +1 I have refactored the constructors to be cleaner with various 
parameter lists.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSourceTaskIgnoresProducerException() throws Exception {
+        createWorkerTaskWithErrorToleration();
+        expectTopicCreation(TOPIC);
+
+        // send two records
+        // record 1 will succeed
+        // record 2 will invoke the producer's failure callback, but ignore 
the exception via retryOperator
+        // and no ConnectException will be thrown
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+        expectSendRecordOnce();
+        expectSendRecordProducerCallbackFail();
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.anyObject(RecordMetadata.class));

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to