showuon commented on code in PR #16502:
URL: https://github.com/apache/kafka/pull/16502#discussion_r1673519734


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -686,60 +702,98 @@ List<EpochEntry> getLeaderEpochEntries(UnifiedLog log, 
long startOffset, long en
     }
 
     // VisibleForTesting
-    RLMTask rlmTask(TopicIdPartition topicIdPartition) {
-        RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+    RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+        RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
         if (task != null) {
             return task.rlmTask;
         }
         return null;
     }
 
-    class RLMTask extends CancellableRunnable {
+    abstract class RLMTask extends CancellableRunnable {
 
-        private final TopicIdPartition topicIdPartition;
-        private final int customMetadataSizeLimit;
+        protected final TopicIdPartition topicIdPartition;
         private final Logger logger;
 
-        private volatile int leaderEpoch = -1;
-
-        public RLMTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit) {
+        public RLMTask(TopicIdPartition topicIdPartition) {
             this.topicIdPartition = topicIdPartition;
-            this.customMetadataSizeLimit = customMetadataSizeLimit;
-            LogContext logContext = new LogContext("[RemoteLogManager=" + 
brokerId + " partition=" + topicIdPartition + "] ");
-            logger = logContext.logger(RLMTask.class);
+            this.logger = 
getLogContext(topicIdPartition).logger(RLMTask.class);
         }
 
-        boolean isLeader() {
-            return leaderEpoch >= 0;
+        protected LogContext getLogContext(TopicIdPartition topicIdPartition) {
+            return new LogContext("[RemoteLogManager=" + brokerId + " 
partition=" + topicIdPartition + "] ");
         }
 
-        // The copied and log-start offset is empty initially for a new leader 
RLMTask, and needs to be fetched inside
+        public void run() {
+            if (isCancelled())
+                return;
+
+            try {
+                Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+
+                if (!unifiedLogOptional.isPresent()) {
+                    return;
+                }
+
+                execute(unifiedLogOptional.get());
+            } catch (InterruptedException ex) {
+                if (!isCancelled()) {
+                    logger.warn("Current thread for topic-partition-id {} is 
interrupted", topicIdPartition, ex);
+                }
+            } catch (RetriableException ex) {
+                logger.debug("Encountered a retryable error while executing 
current task for topic-partition {}", topicIdPartition, ex);
+            } catch (Exception ex) {
+                if (!isCancelled()) {
+                    logger.warn("Current task for topic-partition {} received 
error but it will be scheduled", topicIdPartition, ex);
+                }
+            }
+        }
+
+        protected abstract void execute(UnifiedLog log) throws 
InterruptedException, RemoteStorageException, ExecutionException;
+
+        public String toString() {
+            return this.getClass() + "[" + topicIdPartition + "]";
+        }
+    }
+
+    class RLMCopyTask extends RLMTask {
+        private final int customMetadataSizeLimit;
+        private final int leaderEpoch;
+        private final Logger logger;
+
+        // The copied and log-start offset is empty initially for a new 
RLMCopyTask, and needs to be fetched inside
         // the task's run() method.
         private volatile Optional<OffsetAndEpoch> copiedOffsetOption = 
Optional.empty();
-        private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader = 
false;
+        private volatile boolean isLogStartOffsetUpdated = false;
         private volatile Optional<String> logDirectory = Optional.empty();
 
-        public void convertToLeader(int leaderEpochVal) {
+        public RLMCopyTask(TopicIdPartition topicIdPartition, int 
customMetadataSizeLimit, int leaderEpochVal) {
+            super(topicIdPartition);
+            this.customMetadataSizeLimit = customMetadataSizeLimit;
             if (leaderEpochVal < 0) {
                 throw new KafkaException("leaderEpoch value for topic 
partition " + topicIdPartition + " can not be negative");
             }
-            if (this.leaderEpoch != leaderEpochVal) {
-                leaderEpoch = leaderEpochVal;
-            }
-            // Reset copied and log-start offset, so that it is set in next 
run of RLMTask
-            copiedOffsetOption = Optional.empty();
-            isLogStartOffsetUpdatedOnBecomingLeader = false;
+            this.leaderEpoch = leaderEpochVal;
+            this.logger = 
getLogContext(topicIdPartition).logger(RLMCopyTask.class);
         }
 
-        public void convertToFollower() {
-            leaderEpoch = -1;
+        @Override
+        protected void execute(UnifiedLog log) throws InterruptedException {
+            // In the first run after completing altering logDir within 
broker, we should make sure the state is reset. (KAFKA-16711)
+            if (!log.parentDir().equals(logDirectory.orElse(null))) {
+                copiedOffsetOption = Optional.empty();
+                isLogStartOffsetUpdated = false;
+                logDirectory = Optional.of(log.parentDir());
+            }
+

Review Comment:
   Make sense! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to