abhijeetk88 commented on code in PR #16502: URL: https://github.com/apache/kafka/pull/16502#discussion_r1677489083
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long en } // VisibleForTesting - RLMTask rlmTask(TopicIdPartition topicIdPartition) { - RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition); + RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) { + RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition); if (task != null) { return task.rlmTask; } return null; } - class RLMTask extends CancellableRunnable { + abstract class RLMTask extends CancellableRunnable { - private final TopicIdPartition topicIdPartition; - private final int customMetadataSizeLimit; + protected final TopicIdPartition topicIdPartition; private final Logger logger; - private volatile int leaderEpoch = -1; - - public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) { + public RLMTask(TopicIdPartition topicIdPartition) { this.topicIdPartition = topicIdPartition; - this.customMetadataSizeLimit = customMetadataSizeLimit; - LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] "); - logger = logContext.logger(RLMTask.class); + this.logger = getLogContext(topicIdPartition).logger(RLMTask.class); } - boolean isLeader() { - return leaderEpoch >= 0; + protected LogContext getLogContext(TopicIdPartition topicIdPartition) { Review Comment: Yes, makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org