TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r753523689
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -366,7 +367,11 @@ private boolean sendRecords() {
if (e != null) {
log.error("{} failed to send record to {}: ",
WorkerSourceTask.this, topic, e);
Review comment:
Now that this could be a tolerated error, it makes sense to have it
respect the errors.log.enable configuration, but the log line would be
duplicated, unconditionally writing it in the event we do not tolerate and a
config check if we do.
Are you envisioning something like this?
```
if
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
if (errorLogEnabled) { // get this value from the config in some manner
log.error("{} failed to send record to {}: ", WorkerSourceTask.this,
topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this,
preTransformRecord);
}
commitTaskRecord(preTransformRecord, null);
} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this,
topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this,
preTransformRecord);
producerSendException.compareAndSet(null, e);
}
```
I would need to look more closely at the other layers of objects on top of
the SourceTask. enableErrorLog() is available in the ConnectorConfig, but only
the SinkConnectorConfig makes use of it. I would need to spin up some
additional infrastructure. Not sure if I would want to add
WorkerErrantRecordReporter to WorkerSourceTask or have the configuration pass
down in some other manner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]