kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + // NOTE: Below we set the finalized min version level to be the default minimum version + // level. If the finalized feature already exists, then, this can cause deprecation of all + // version levels in the closed range: + // [existingVersionRange.min(), defaultMinVersionLevel - 1]. + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right( + new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) + } + } + + /** + * Validate and process a finalized feature update on an existing FinalizedVersionRange for the + * feature. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (existingVersionRange.isEmpty) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { Review comment: A value < 1 is indicative of a deletion request (not purely a downgrade request). It is for convenience of generating a special error message, that we handle the case here explicitly: `...less than 1 for feature...`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org