kamalcph commented on code in PR #16502: URL: https://github.com/apache/kafka/pull/16502#discussion_r1673686505
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); - rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); + rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(), + "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-"); + rlmExpirationThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(), Review Comment: We can start using the respective threadpool-size: ``` rlmConfig.remoteLogManagerExpirationThreadPoolSize() ``` ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -241,12 +245,17 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); - rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); + rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(), + "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-"); + rlmExpirationThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(), + "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-"); + followerThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(), + "RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-"); Review Comment: The amount of threads configured by the user and instantiated will be 2X, Why do we need a separate follower thread pool? We can have three different tasks for clarity: RLMCopyTask, RLMExpirationTask and RLMFollowerTask. RLMCopyTask and RLMFollowerTask can reuse the same thread-pool. WDYT? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long en } // VisibleForTesting - RLMTask rlmTask(TopicIdPartition topicIdPartition) { - RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition); + RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) { + RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition); if (task != null) { return task.rlmTask; } return null; } - class RLMTask extends CancellableRunnable { + abstract class RLMTask extends CancellableRunnable { - private final TopicIdPartition topicIdPartition; - private final int customMetadataSizeLimit; + protected final TopicIdPartition topicIdPartition; private final Logger logger; - private volatile int leaderEpoch = -1; - - public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) { + public RLMTask(TopicIdPartition topicIdPartition) { this.topicIdPartition = topicIdPartition; - this.customMetadataSizeLimit = customMetadataSizeLimit; - LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] "); - logger = logContext.logger(RLMTask.class); + this.logger = getLogContext(topicIdPartition).logger(RLMTask.class); } - boolean isLeader() { - return leaderEpoch >= 0; + protected LogContext getLogContext(TopicIdPartition topicIdPartition) { Review Comment: We can avoid taking the `topicIdPartition` parameter since it is also the class variable ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1761,19 +1796,48 @@ public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteL new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer)); } - void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, - Consumer<RLMTask> convertToLeaderOrFollower) { - RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, - topicIdPartition -> { - RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); - // set this upfront when it is getting initialized instead of doing it after scheduling. - convertToLeaderOrFollower.accept(task); - LOGGER.info("Created a new task: {} and getting scheduled", task); - ScheduledFuture<?> future = rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS); - return new RLMTaskWithFuture(task, future); - } - ); - convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask); + void doHandleLeaderPartition(TopicIdPartition topicPartition, int leaderEpoch) { + RLMTaskWithFuture followerRLMTaskWithFuture = followerRLMTasks.remove(topicPartition); + if (followerRLMTaskWithFuture != null) { + LOGGER.info("Cancelling the follower task: {}", followerRLMTaskWithFuture.rlmTask); + followerRLMTaskWithFuture.cancel(); + } + + leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { + RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes(), leaderEpoch); Review Comment: Will there be a leaderEpoch bump and the same node can still be the leader? If yes, then the leaderEpoch maintained inside the RLMCopyTask might become stale. Currently, leaderEpoch is unused inside the RLMCopyTask, we can refactor the code to remove it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org