mumrah commented on code in PR #12207: URL: https://github.com/apache/kafka/pull/12207#discussion_r881925451
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -94,35 +138,6 @@ ControllerResult<Map<String, ApiError>> updateFeatures( } } - ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) { Review Comment: Yea, this was leftover from a previous iteration on the bootstrapping logic where the controller was just getting the initial metadata version. ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -264,11 +265,11 @@ FinalizedControllerFeatures finalizedFeatures(long epoch) { } public void replay(FeatureLevelRecord record) { - if (!canSupportVersion(record.name(), record.featureLevel())) { Review Comment: This made me wonder about bootstrapping and if we could load in an unknown version. However, looking at BootstrapMetadata#load, we will parse the feature level as a MetadataVersion before returning so we would throw an exception as we read the bootstrap records into memory. This could happen if someone generates a bootstrap snapshot at version N and then tries to load it into a cluster at N-1 ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -234,9 +231,13 @@ private ApiError updateMetadataVersion( boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion); if (!metadataChanged) { log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion); + } else if (allowUnsafeDowngrade) { + log.info("Downgrading metadata.version unsafely from {} to {}.", currentVersion, newVersion); Review Comment: If we want to allow unsafe downgrades we should update the message just below ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -208,19 +218,6 @@ private ApiError updateMetadataVersion( boolean allowUnsafeDowngrade, Consumer<ApiMessageAndVersion> recordConsumer ) { - Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); - if (!quorumSupported.isPresent()) { - return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); - } - - if (newVersionLevel <= 0) { Review Comment: Should we allow a `metadata.version` of zero? Seems like it might complicate things to support a non-present metadata.version at runtime. Would that mean we need to revert back to reading the IBP from config? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ########## @@ -75,47 +76,50 @@ public static Map<String, VersionRange> defaultFeatureMap() { return features; } - Optional<VersionRange> quorumSupportedFeature(String featureName) { - List<VersionRange> supportedVersions = new ArrayList<>(quorumNodeIds.size()); - for (int nodeId : quorumNodeIds) { - if (nodeId == this.nodeId) { - // We get this node's features from "supportedFeatures" - continue; + /** + * Return the reason a specific feature level is not supported, or Optional.empty if it is supported. + * + * @param featureName The feature name. + * @param level The feature level. + * @return The reason why the feature level is not supported, or Optional.empty if it is supported. + */ + public Optional<String> reasonNotSupported(String featureName, short level) { Review Comment: The name sort of seems like we're expecting it not to be supported. Maybe call it something like `checkIfQuorumSupported`? I think it would be good to include "quorum" in here to avoid confusion with the "local" compatibility check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org