FrankYang0529 commented on code in PR #21273:
URL: https://github.com/apache/kafka/pull/21273#discussion_r2816704674


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1667,6 +1667,9 @@ ControllerResult<BrokerHeartbeatReply> 
processBrokerHeartbeat(
         if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             handleDirectoriesOffline(brokerId, brokerEpoch, 
request.offlineLogDirs(), records);
         }
+        if 
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
+            clusterControl.updateCordonedLogDirs(brokerId, 
request.cordonedLogDirs());

Review Comment:
   It looks like this only change value in memory, but doesn't generate a 
`BrokerRegistrationChangeRecord`. Do we need to add this record to quorum?



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -367,17 +394,20 @@ public String toString() {
                 ", inControlledShutdown=" + inControlledShutdown +
                 ", isMigratingZkBroker=" + isMigratingZkBroker +
                 ", directories=" + directories +
+                ", cordonedDirectories=" + cordonedDirectories +
                 ")";
     }
 
     public BrokerRegistration cloneWith(
         Optional<Boolean> fencingChange,
         Optional<Boolean> inControlledShutdownChange,
-        Optional<List<Uuid>> directoriesChange
+        Optional<List<Uuid>> directoriesChange,
+        Optional<List<Uuid>> cordonedDirectoriesChange
     ) {
         boolean newFenced = fencingChange.orElse(fenced);
         boolean newInControlledShutdownChange = 
inControlledShutdownChange.orElse(inControlledShutdown);
         List<Uuid> newDirectories = directoriesChange.orElse(directories);
+        List<Uuid> newCordonedDirectories = 
cordonedDirectoriesChange.orElse(cordonedDirectories);
 
         if (newFenced == fenced && newInControlledShutdownChange == 
inControlledShutdown && newDirectories.equals(directories))
             return this;

Review Comment:
   Should we also consider only `cordonedDirectoriesChange` change case? For 
example, `fenced`, `inControlledShutdown`, and `directories` are equal, so the 
function returns and ignores `cordonedDirectories` change?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -100,6 +100,7 @@ class LogManager(logDirs: Seq[File],
   private val strayLogs = new ConcurrentHashMap[TopicPartition, UnifiedLog]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
+  private var _cordonedLogDirs: Set[String] = Set()

Review Comment:
   This field can be updated. Do we need to add `@volatile` as 
`_currentDefaultConfig` and `numRecoveryThreadsPerDataDir`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to