chia7712 commented on code in PR #18845:
URL: https://github.com/apache/kafka/pull/18845#discussion_r1949874774


##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -328,15 +314,10 @@ private ApiError updateMetadataVersion(
         try {
             newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
         } catch (IllegalArgumentException e) {
-            return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
+            return invalidMetadataVersion(newVersionLevel, "Valid versions are 
from "

Review Comment:
   Should we enhance the error message for `fromFeatureLevel` as well?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1652,9 +1652,7 @@ private void registerWriteNoOpRecord(long 
maxIdleIntervalNs) {
         periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
             () -> {
                 ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
-                if (featureControl.metadataVersion().isNoOpRecordSupported()) {
-                    records.add(new ApiMessageAndVersion(new NoOpRecord(), 
(short) 0));
-                }
+                records.add(new ApiMessageAndVersion(new NoOpRecord(), (short) 
0));

Review Comment:
   Maybe we can streamline the code?
   ```
   () -> ControllerResult.of(List.of(new ApiMessageAndVersion(new NoOpRecord(), 
(short) 0)), false),
   ```
   



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1424,15 +1406,10 @@ void handleBrokerUnregistered(int brokerId, long 
brokerEpoch,
      * @param records       The record list to append to.
      */
     void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVersion> records) {
-        if 
(featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
-            records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
-                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
-                setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
-                (short) 0));
-        } else {
-            records.add(new ApiMessageAndVersion(new 
UnfenceBrokerRecord().setId(brokerId).
-                setEpoch(brokerEpoch), (short) 0));
-        }
+        records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().

Review Comment:
   ditto



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1379,16 +1367,10 @@ void handleBrokerFenced(int brokerId, 
List<ApiMessageAndVersion> records) {
         }
         generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, 
NO_LEADER, records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        if 
(featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
-            records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
-                    
setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
-                    setFenced(BrokerRegistrationFencingChange.FENCE.value()),
-                    (short) 0));
-        } else {
-            records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
-                    setId(brokerId).setEpoch(brokerRegistration.epoch()),
-                    (short) 0));
-        }
+        records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().

Review Comment:
   Could you please update comment of `handleBrokerFenced` according to this 
change?



##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java:
##########
@@ -126,7 +125,7 @@ public void testFeatureLevelForFeature() {
     static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION 
= Collections.singletonList(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(FEATURE_NAME).
-                setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0));
+                setFeatureLevel((short) 1), (short) 0));

Review Comment:
   This feature version is not supported any more, so we should either remove 
or modify the test.



##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -128,18 +126,6 @@ public void testReassignment() throws Exception {
         executeAndVerifyReassignment();
     }
 
-    @ClusterTests({
-        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = 
IBP_3_3_IV0)
-    })
-    public void testReassignmentWithAlterPartitionDisabled() throws Exception {
-        // Test reassignment when the IBP is on an older version which does 
not use
-        // the `AlterPartition` API. In this case, the controller will 
register individual
-        // watches for each reassigning partition so that the reassignment can 
be
-        // completed as soon as the ISR is expanded.

Review Comment:
   It seems to me the behavior is related to zk only, and in kraft mode 
`AlterPartition` should be supported totally. Hence, it is fine to remove the 
test.
   
   BTW, the test case for kraft is unnecessary - I overlook the scenario in 
merging #15675



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -86,12 +84,7 @@ BootstrapMetadata readFromConfiguration() {
             return 
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
         }
         MetadataVersion version = MetadataVersion.fromVersionString(ibp.get());
-        if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) {

Review Comment:
   the `ibp` is always empty after the config is removed by #18566
   
   I had opened https://issues.apache.org/jira/browse/KAFKA-18740 to cleanup 
it. Maybe this PR can include the cleanup as well?



##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -293,11 +293,7 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions 
options) {
             setFenced(fenced);
 
         if (inControlledShutdown) {
-            if 
(options.metadataVersion().isInControlledShutdownStateSupported()) {
-                registrationRecord.setInControlledShutdown(true);
-            } else {
-                options.handleLoss("the inControlledShutdown state of one or 
more brokers");
-            }
+            registrationRecord.setInControlledShutdown(true);

Review Comment:
   Maybe we can set the flag in line#294. 
   ```
           RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
               setBrokerId(id).
               setRack(rack.orElse(null)).
               setBrokerEpoch(epoch).
               setIncarnationId(incarnationId).
               setFenced(fenced).
               setInControlledShutdown(inControlledShutdown);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to