abhijeetk88 commented on code in PR #16502: URL: https://github.com/apache/kafka/pull/16502#discussion_r1677490420
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1761,19 +1796,48 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteL new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer)); } - void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, - Consumer<RLMTask> convertToLeaderOrFollower) { - RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, - topicIdPartition -> { - RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); - // set this upfront when it is getting initialized instead of doing it after scheduling. - convertToLeaderOrFollower.accept(task); - LOGGER.info("Created a new task: {} and getting scheduled", task); - ScheduledFuture<?> future = rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS); - return new RLMTaskWithFuture(task, future); - } - ); - convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask); + void doHandleLeaderPartition(TopicIdPartition topicPartition, int leaderEpoch) { + RLMTaskWithFuture followerRLMTaskWithFuture = followerRLMTasks.remove(topicPartition); + if (followerRLMTaskWithFuture != null) { + LOGGER.info("Cancelling the follower task: {}", followerRLMTaskWithFuture.rlmTask); + followerRLMTaskWithFuture.cancel(); + } + + leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { + RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes(), leaderEpoch); Review Comment: The leaderEpoch is only used for logging right now. If it is useful for debugging, we can keep it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org