TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104806
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -364,9 +365,16 @@ private boolean sendRecords() {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
- log.error("{} failed to send record to {}: ",
WorkerSourceTask.this, topic, e);
- log.trace("{} Failed record: {}",
WorkerSourceTask.this, preTransformRecord);
- producerSendException.compareAndSet(null, e);
+ if
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+ // executeFailed here allows the use of
existing logging infrastructure/configuration
+
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE,
WorkerSourceTask.class,
+ preTransformRecord, e);
+ commitTaskRecord(preTransformRecord, null);
Review comment:
Previously it was suggested to have the tolerance operator handle via
the logging report. I would personally find it useful to have it in the connect
log regardless of tolerance error logging configuration. I've moved the
error/debug log lines to above the tolerance check to log in all instances.
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
}
}
+ // For source connectors that want to skip kafka producer errors.
+ // They cannot use withinToleranceLimits() as no failure may have actually
occurred prior to the producer failing
+ // to write to kafka.
+ public synchronized ToleranceType getErrorToleranceType() {
Review comment:
It does not. Type is immutable and thread safe. I had dug through the
ticket that retroactively made this class thread safe and it seemed like a good
idea at the time to slap a synchronized on it to match the rest of the class,
but is not necessary at all. Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]