TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104806



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
                     producerRecord,
                     (recordMetadata, e) -> {
                         if (e != null) {
-                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-                            log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                            producerSendException.compareAndSet(null, e);
+                            if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+                                // executeFailed here allows the use of 
existing logging infrastructure/configuration
+                                
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, 
WorkerSourceTask.class,
+                                        preTransformRecord, e);
+                                commitTaskRecord(preTransformRecord, null);

Review comment:
       Previously it was suggested to have the tolerance operator handle via 
the logging report. I would personally find it useful to have it in the connect 
log regardless of tolerance error logging configuration. I've moved the 
error/debug log lines to above the tolerance check to log in all instances.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
         }
     }
 
+    // For source connectors that want to skip kafka producer errors.
+    // They cannot use withinToleranceLimits() as no failure may have actually 
occurred prior to the producer failing
+    // to write to kafka.
+    public synchronized ToleranceType getErrorToleranceType() {

Review comment:
       It does not. Type is immutable and thread safe. I had dug through the 
ticket that retroactively made this class thread safe and it seemed like a good 
idea at the time to slap a synchronized on it to match the rest of the class, 
but is not necessary at all. Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to