dengziming commented on code in PR #13799:
URL: https://github.com/apache/kafka/pull/13799#discussion_r1213858197


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -425,25 +430,24 @@ public void accept(ConfigResource configResource) {
 
     public static final String CONTROLLER_THREAD_SUFFIX = 
"QuorumControllerEventHandler";
 
-    private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
-        "The active controller appears to be node ";
-
-    private NotControllerException newNotControllerException() {
-        OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-        if (latestController.isPresent()) {
-            return new 
NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
-                latestController.getAsInt() + ".");
-        } else {
-            return new NotControllerException("No controller appears to be 
active.");
-        }
+    private OptionalInt latestController() {
+        return raftClient.leaderAndEpoch().leaderId();
     }
 
-    private NotControllerException newPreMigrationException() {
-        OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-        if (latestController.isPresent()) {
-            return new NotControllerException("The controller is in 
pre-migration mode.");
+    /**
+     * @return          The offset that we should perform read operations at.
+     */
+    private long currentReadOffset() {
+        if (isActiveController()) {
+            // The active controller keeps an in-memory snapshot at the last 
committed offset,
+            // which we want to read from when performing read operations. 
This will avoid
+            // reading uncommitted data.
+            return lastCommittedOffset;
         } else {
-            return new NotControllerException("No controller appears to be 
active.");
+            // Standby controllers never have uncommitted data in memory. 
Therefore, we return
+            // Long.MAX_VALUE, a special value which means "always read the 
latest from every
+            // data structure."
+            return Long.MAX_VALUE;

Review Comment:
   How about using `SnapshotRegistry.LATEST_EPOCH`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to