chia7712 commented on code in PR #18845: URL: https://github.com/apache/kafka/pull/18845#discussion_r1949874774
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -328,15 +314,10 @@ private ApiError updateMetadataVersion( try { newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); } catch (IllegalArgumentException e) { - return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); + return invalidMetadataVersion(newVersionLevel, "Valid versions are from " Review Comment: Should we enhance the error message for `fromFeatureLevel` as well? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1652,9 +1652,7 @@ private void registerWriteNoOpRecord(long maxIdleIntervalNs) { periodicControl.registerTask(new PeriodicTask("writeNoOpRecord", () -> { ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1); - if (featureControl.metadataVersion().isNoOpRecordSupported()) { - records.add(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)); - } + records.add(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)); Review Comment: Maybe we can streamline the code? ``` () -> ControllerResult.of(List.of(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)), false), ``` ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1424,15 +1406,10 @@ void handleBrokerUnregistered(int brokerId, long brokerEpoch, * @param records The record list to append to. */ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) { - if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) { - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). - setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), - (short) 0)); - } else { - records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId). - setEpoch(brokerEpoch), (short) 0)); - } + records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). Review Comment: ditto ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1379,16 +1367,10 @@ void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) { } generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); - if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) { - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()). - setFenced(BrokerRegistrationFencingChange.FENCE.value()), - (short) 0)); - } else { - records.add(new ApiMessageAndVersion(new FenceBrokerRecord(). - setId(brokerId).setEpoch(brokerRegistration.epoch()), - (short) 0)); - } + records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). Review Comment: Could you please update comment of `handleBrokerFenced` according to this change? ########## metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java: ########## @@ -126,7 +125,7 @@ public void testFeatureLevelForFeature() { static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = Collections.singletonList( new ApiMessageAndVersion(new FeatureLevelRecord(). setName(FEATURE_NAME). - setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)); + setFeatureLevel((short) 1), (short) 0)); Review Comment: This feature version is not supported any more, so we should either remove or modify the test. ########## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java: ########## @@ -128,18 +126,6 @@ public void testReassignment() throws Exception { executeAndVerifyReassignment(); } - @ClusterTests({ - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0) - }) - public void testReassignmentWithAlterPartitionDisabled() throws Exception { - // Test reassignment when the IBP is on an older version which does not use - // the `AlterPartition` API. In this case, the controller will register individual - // watches for each reassigning partition so that the reassignment can be - // completed as soon as the ISR is expanded. Review Comment: It seems to me the behavior is related to zk only, and in kraft mode `AlterPartition` should be supported totally. Hence, it is fine to remove the test. BTW, the test case for kraft is unnecessary - I overlook the scenario in merging #15675 ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java: ########## @@ -86,12 +84,7 @@ BootstrapMetadata readFromConfiguration() { return BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default bootstrap"); } MetadataVersion version = MetadataVersion.fromVersionString(ibp.get()); - if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) { Review Comment: the `ibp` is always empty after the config is removed by #18566 I had opened https://issues.apache.org/jira/browse/KAFKA-18740 to cleanup it. Maybe this PR can include the cleanup as well? ########## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ########## @@ -293,11 +293,7 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions options) { setFenced(fenced); if (inControlledShutdown) { - if (options.metadataVersion().isInControlledShutdownStateSupported()) { - registrationRecord.setInControlledShutdown(true); - } else { - options.handleLoss("the inControlledShutdown state of one or more brokers"); - } + registrationRecord.setInControlledShutdown(true); Review Comment: Maybe we can set the flag in line#294. ``` RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord(). setBrokerId(id). setRack(rack.orElse(null)). setBrokerEpoch(epoch). setIncarnationId(incarnationId). setFenced(fenced). setInControlledShutdown(inControlledShutdown); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org