C0urante commented on PR #12602: URL: https://github.com/apache/kafka/pull/12602#issuecomment-1240821236
Thanks @rgroothuijsen! I agree with the `DEBUG` level for logging; it's tempting to make this silent, but I'd prefer to err on the side of giving users more information since we can't add logging to releases once they've gone out. RE testing, there are a few approaches I can think of: 0. Simply remove the try/catch block in `MirrorSourceTask::commitRecord`, since it essentially duplicates [logic in the Connect framework](https://github.com/apache/kafka/blob/0c97be53fa7e1e0720f2086b5d9d80ffcc1db470/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L544-L548) with the small change of using the `WARN` level for logging instead of `ERROR` 0. If we'd prefer to keep the logging as-is, we can isolate all of the logic for the method except exception handling into a package-private `doCommitRecord` method, which is then wrapped by the public `commitRecord` method. I.e.: ```java @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { try { doCommitRecord(record, metadata); } catch (Throwable e) { log.warn("Failure committing record.", e); } } // Visible for testing void doCommitRecord(SourceRecord record, RecordMetadata metadata) { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org