kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936048
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
}
}
+ /**
+ * Returns the new FinalizedVersionRange for the feature, if there are no
feature
+ * incompatibilities seen with all known brokers for the provided feature
update.
+ * Otherwise returns a suitable error.
+ *
+ * @param update the feature update to be processed (this can not be meant
to delete the feature)
+ *
+ * @return the new FinalizedVersionRange or error, as described
above.
+ */
+ private def newFinalizedVersionRangeOrIncompatibilityError(update:
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange,
ApiError] = {
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ throw new IllegalArgumentException(s"Provided feature update can not be
meant to delete the feature: $update")
+ }
+ // NOTE: Below we set the finalized min version level to be the default
minimum version
+ // level. If the finalized feature already exists, then, this can cause
deprecation of all
+ // version levels in the closed range:
+ // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+ val defaultMinVersionLevel =
brokerFeatures.defaultMinVersionLevel(update.feature)
+ val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel,
update.maxVersionLevel)
+ val numIncompatibleBrokers =
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+ val singleFinalizedFeature =
+ Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature,
newVersionRange)))
+ BrokerFeatures.hasIncompatibleFeatures(broker.features,
singleFinalizedFeature)
+ })
+ if (numIncompatibleBrokers == 0) {
+ Left(newVersionRange)
+ } else {
+ Right(
+ new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+ }
+ }
+
+ /**
+ * Validate and process a finalized feature update on an existing
FinalizedVersionRange for the
+ * feature.
+ *
+ * If the processing is successful, then, the return value contains:
+ * 1. the new FinalizedVersionRange for the feature, if the feature update
was not meant to delete the feature.
+ * 2. Option.empty, if the feature update was meant to delete the feature.
+ *
+ * If the processing failed, then returned value contains a suitable
ApiError.
+ *
+ * @param update the feature update to be processed.
+ * @param existingVersionRange the existing FinalizedVersionRange which
can be empty when no
+ * FinalizedVersionRange exists for the
associated feature
+ *
+ * @return the new FinalizedVersionRange or error, as
described above.
+ */
+ private def processFeatureUpdate(update:
UpdateFeaturesRequestData.FeatureUpdateKey,
+ existingVersionRange:
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError]
= {
+ def newVersionRangeOrError(update:
UpdateFeaturesRequestData.FeatureUpdateKey):
Either[Option[FinalizedVersionRange], ApiError] = {
+ newFinalizedVersionRangeOrIncompatibilityError(update)
+ .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+ }
+
+ if (update.feature.isEmpty) {
+ // Check that the feature name is not empty.
+ Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be
empty."))
+ } else {
+ // We handle deletion requests separately from non-deletion requests.
+ if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+ if (existingVersionRange.isEmpty) {
+ // Disallow deletion of a non-existing finalized feature.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature:
'${update.feature}'"))
+ } else {
+ Left(Option.empty)
+ }
+ } else if (update.maxVersionLevel() < 1) {
+ // Disallow deletion of a finalized feature without allowDowngrade
flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not provide maxVersionLevel:
${update.maxVersionLevel} less" +
+ s" than 1 for feature: '${update.feature}' without
setting the" +
+ " allowDowngrade flag to true in the request."))
+ } else {
+ existingVersionRange.map(existing =>
+ if (update.maxVersionLevel == existing.max) {
+ // Disallow a case where target maxVersionLevel matches existing
maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not ${if (update.allowDowngrade) "downgrade" else
"upgrade"}" +
+ s" a finalized feature: '${update.feature}' from existing" +
+ s" maxVersionLevel:${existing.max} to the same value."))
+ } else if (update.maxVersionLevel < existing.max &&
!update.allowDowngrade) {
+ // Disallow downgrade of a finalized feature without the
allowDowngrade flag set.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature: '${update.feature}' from"
+
+ s" existing maxVersionLevel:${existing.max} to provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} without setting
the" +
+ " allowDowngrade flag in the request."))
+ } else if (update.allowDowngrade && update.maxVersionLevel >
existing.max) {
+ // Disallow a request that sets allowDowngrade flag without
specifying a
+ // maxVersionLevel that's lower than the existing maxVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"When finalized feature: '${update.feature}' has the
allowDowngrade" +
+ " flag set in the request, the provided" +
+ s" maxVersionLevel:${update.maxVersionLevel} can not be
greater than" +
+ s" existing maxVersionLevel:${existing.max}."))
+ } else if (update.maxVersionLevel() < existing.min) {
+ // Disallow downgrade of a finalized feature below the existing
finalized
+ // minVersionLevel.
+ Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not downgrade finalized feature: '${update.feature}' to" +
+ s" maxVersionLevel:${update.maxVersionLevel} because it's
lower than" +
+ s" the existing minVersionLevel:${existing.min}."))
+ } else {
+ newVersionRangeOrError(update)
+ }
+ ).getOrElse(newVersionRangeOrError(update))
+ }
+ }
+ }
+
+ private def processFeatureUpdates(request: UpdateFeaturesRequest,
+ callback: UpdateFeaturesCallback): Unit = {
+ if (isActive) {
+ processFeatureUpdatesWithActiveController(request, callback)
+ } else {
+ val results = request.data().featureUpdates().asScala.map {
+ update => update.feature() -> new ApiError(Errors.NOT_CONTROLLER)
+ }.toMap
+ callback(results)
+ }
+ }
+
+ private def processFeatureUpdatesWithActiveController(request:
UpdateFeaturesRequest,
+ callback:
UpdateFeaturesCallback): Unit = {
+ val updates = request.data.featureUpdates
+ val existingFeatures = featureCache.get
+ .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+ .getOrElse(Map[String, FinalizedVersionRange]())
+ // Map of feature to FinalizedVersionRange. This contains the target
features to be eventually
+ // written to FeatureZNode.
+ val targetFeatures = scala.collection.mutable.Map[String,
FinalizedVersionRange]() ++ existingFeatures
+ // Map of feature to error.
+ var errors = scala.collection.mutable.Map[String, ApiError]()
+
+ // Process each FeatureUpdate.
+ // If a FeatureUpdate is found to be valid, then the corresponding entry
in errors would contain
+ // Errors.NONE. Otherwise the entry would contain the appropriate error.
+ updates.asScala.iterator.foreach { update =>
+ processFeatureUpdate(update, existingFeatures.get(update.feature()))
match {
+ case Left(newVersionRangeOrNone) =>
+ newVersionRangeOrNone
+ .map(newVersionRange => targetFeatures += (update.feature() ->
newVersionRange))
+ .getOrElse(targetFeatures -= update.feature())
+ errors += (update.feature() -> new ApiError(Errors.NONE))
+ case Right(featureUpdateFailureReason) =>
+ errors += (update.feature() -> featureUpdateFailureReason)
+ }
+ }
+
+ if (existingFeatures.equals(targetFeatures)) {
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]