abhijeetk88 commented on code in PR #16502: URL: https://github.com/apache/kafka/pull/16502#discussion_r1673509846
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, long startOffset, long en } // VisibleForTesting - RLMTask rlmTask(TopicIdPartition topicIdPartition) { - RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition); + RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) { + RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition); if (task != null) { return task.rlmTask; } return null; } - class RLMTask extends CancellableRunnable { + abstract class RLMTask extends CancellableRunnable { - private final TopicIdPartition topicIdPartition; - private final int customMetadataSizeLimit; + protected final TopicIdPartition topicIdPartition; private final Logger logger; - private volatile int leaderEpoch = -1; - - public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) { + public RLMTask(TopicIdPartition topicIdPartition) { this.topicIdPartition = topicIdPartition; - this.customMetadataSizeLimit = customMetadataSizeLimit; - LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] "); - logger = logContext.logger(RLMTask.class); + this.logger = getLogContext(topicIdPartition).logger(RLMTask.class); } - boolean isLeader() { - return leaderEpoch >= 0; + protected LogContext getLogContext(TopicIdPartition topicIdPartition) { + return new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] "); } - // The copied and log-start offset is empty initially for a new leader RLMTask, and needs to be fetched inside + public void run() { + if (isCancelled()) + return; + + try { + Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); + + if (!unifiedLogOptional.isPresent()) { + return; + } + + execute(unifiedLogOptional.get()); + } catch (InterruptedException ex) { + if (!isCancelled()) { + logger.warn("Current thread for topic-partition-id {} is interrupted", topicIdPartition, ex); + } + } catch (RetriableException ex) { + logger.debug("Encountered a retryable error while executing current task for topic-partition {}", topicIdPartition, ex); + } catch (Exception ex) { + if (!isCancelled()) { + logger.warn("Current task for topic-partition {} received error but it will be scheduled", topicIdPartition, ex); + } + } + } + + protected abstract void execute(UnifiedLog log) throws InterruptedException, RemoteStorageException, ExecutionException; + + public String toString() { + return this.getClass() + "[" + topicIdPartition + "]"; + } + } + + class RLMCopyTask extends RLMTask { + private final int customMetadataSizeLimit; + private final int leaderEpoch; + private final Logger logger; + + // The copied and log-start offset is empty initially for a new RLMCopyTask, and needs to be fetched inside // the task's run() method. private volatile Optional<OffsetAndEpoch> copiedOffsetOption = Optional.empty(); - private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = false; + private volatile boolean isLogStartOffsetUpdated = false; private volatile Optional<String> logDirectory = Optional.empty(); - public void convertToLeader(int leaderEpochVal) { + public RLMCopyTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit, int leaderEpochVal) { + super(topicIdPartition); + this.customMetadataSizeLimit = customMetadataSizeLimit; if (leaderEpochVal < 0) { throw new KafkaException("leaderEpoch value for topic partition " + topicIdPartition + " can not be negative"); } - if (this.leaderEpoch != leaderEpochVal) { - leaderEpoch = leaderEpochVal; - } - // Reset copied and log-start offset, so that it is set in next run of RLMTask - copiedOffsetOption = Optional.empty(); - isLogStartOffsetUpdatedOnBecomingLeader = false; + this.leaderEpoch = leaderEpochVal; + this.logger = getLogContext(topicIdPartition).logger(RLMCopyTask.class); } - public void convertToFollower() { - leaderEpoch = -1; + @Override + protected void execute(UnifiedLog log) throws InterruptedException { + // In the first run after completing altering logDir within broker, we should make sure the state is reset. (KAFKA-16711) + if (!log.parentDir().equals(logDirectory.orElse(null))) { + copiedOffsetOption = Optional.empty(); + isLogStartOffsetUpdated = false; + logDirectory = Optional.of(log.parentDir()); + } + Review Comment: The three variables dont seem to be used in the follower. These variables are not part of RLMFollowerTask as well. If these were used, the code wouldn't compile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org