C0urante commented on PR #12602:
URL: https://github.com/apache/kafka/pull/12602#issuecomment-1240821236

   Thanks @rgroothuijsen! I agree with the `DEBUG` level for logging; it's 
tempting to make this silent, but I'd prefer to err on the side of giving users 
more information since we can't add logging to releases once they've gone out.
   
   RE testing, there are a few approaches I can think of:
   
   0. Simply remove the try/catch block in `MirrorSourceTask::commitRecord`, 
since it essentially duplicates [logic in the Connect 
framework](https://github.com/apache/kafka/blob/0c97be53fa7e1e0720f2086b5d9d80ffcc1db470/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L544-L548)
 with the small change of using the `WARN` level for logging instead of `ERROR`
   0. If we'd prefer to keep the logging as-is, we can isolate all of the logic 
for the method except exception handling into a package-private 
`doCommitRecord` method, which is then wrapped by the public `commitRecord` 
method. I.e.:
   ```java
   @Override
   public void commitRecord(SourceRecord record, RecordMetadata metadata) {
       try {
           doCommitRecord(record, metadata);
       } catch (Throwable e) {
           log.warn("Failure committing record.", e);
       }
   }
   
   // Visible for testing
   void doCommitRecord(SourceRecord record, RecordMetadata metadata) {
       if (stopping) {
           return;
       }
       if (!metadata.hasOffset()) {
           log.error("RecordMetadata has no offset -- can't sync offsets for 
{}.", record.topic());
           return;
       }
       TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
       long latency = System.currentTimeMillis() - record.timestamp();
       metrics.countRecord(topicPartition);
       metrics.replicationLatency(topicPartition, latency);
       TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
       long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
       long downstreamOffset = metadata.offset();
       maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to