abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496901588
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a range of version levels supported by every broker in a cluster for some feature. + */ +public class FinalizedVersionRange { Review comment: Do we want to have a different name from `org.apache.kafka.common.feature.FinalizedVersionRange`, such as `FinalizedVersionLevels`? Same case for `SupportedVersionRange`, personally I feel the same class name makes the navigation harder. ########## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ########## @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the latest features supported by the Broker and also provides APIs to + * check for incompatibilities between the features supported by the Broker and finalized features. + * The class also enables feature version level deprecation, as explained below. This class is + * immutable in production. It provides few APIs to mutate state only for the purpose of testing. + * + * Feature version level deprecation: + * ================================== + * + * Deprecation of certain version levels of a feature is a process to stop supporting the + * functionality offered by the feature at those version levels, across the entire Kafka cluster. + * Feature version deprecation is a simple 2-step process explained below. In each step below, an + * example is provided to help understand the process better: + * + * STEP 1: + * ======= + * + * In the first step, a major Kafka release is made with a Broker code change (explained later + * below) that establishes the intent to deprecate certain versions of one or more features + * cluster-wide. When this new Kafka release is deployed to the cluster, deprecated finalized + * feature versions are no longer advertised to the client, but they can still be used by existing + * connections. The way it works is that the feature versioning system (via the controller) will + * automatically persist the new minVersionLevel for the feature in ZK to propagate the deprecation + * of certain versions. After this happens, any external client that queries the Broker to learn the + * feature versions will at some point start to see the new value for the finalized minVersionLevel + * for the feature. The external clients are expected to stop using the deprecated versions at least + * by the time that they learn about it. + * + * Here is how the above code change needs to be done: + * In order to deprecate feature version levels, in the supportedFeatures map you need to supply a + * specific firstActiveVersion value that's higher than the minVersion for the feature. The + * value for firstActiveVersion should be 1 beyond the highest version that you intend to deprecate + * for that feature. Whenever the controller is elected or the features are finalized via the + * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range: + * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by the controller logic. + * + * Example: + * - Let us assume the existing finalized feature in ZK: + * { + * "feature" -> FinalizedVersionRange(minVersionLevel=1, maxVersionLevel=5) + * } + * Now, supposing you would like to deprecate feature version levels: [1, 2]. + * Then, in the supportedFeatures map you should supply the following: + * supportedFeatures = { + * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, maxVersion=5) + * } + * - If you do NOT want to deprecate a version level for a feature, then, in the supportedFeatures Review comment: I think we could just make the `firstActiveVersion = minVersion` by default, to avoid the requirement for configuring firstActiveVersion ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -20,26 +20,31 @@ package kafka.server import kafka.utils.Logging import org.apache.kafka.common.feature.{Features, FinalizedVersionRange} +import scala.concurrent.TimeoutException +import scala.math.max + // Raised whenever there was an error in updating the FinalizedFeatureCache with features. class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { } // Helper class that represents finalized features along with an epoch value. -case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Int) { +case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Long) { Review comment: So we are saving the ZK epoch in a long, which was supposed to be an int field? ########## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ########## @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the latest features supported by the Broker and also provides APIs to + * check for incompatibilities between the features supported by the Broker and finalized features. + * The class also enables feature version level deprecation, as explained below. This class is + * immutable in production. It provides few APIs to mutate state only for the purpose of testing. + * + * Feature version level deprecation: + * ================================== + * + * Deprecation of certain version levels of a feature is a process to stop supporting the + * functionality offered by the feature at those version levels, across the entire Kafka cluster. + * Feature version deprecation is a simple 2-step process explained below. In each step below, an + * example is provided to help understand the process better: + * + * STEP 1: + * ======= + * + * In the first step, a major Kafka release is made with a Broker code change (explained later + * below) that establishes the intent to deprecate certain versions of one or more features + * cluster-wide. When this new Kafka release is deployed to the cluster, deprecated finalized + * feature versions are no longer advertised to the client, but they can still be used by existing + * connections. The way it works is that the feature versioning system (via the controller) will + * automatically persist the new minVersionLevel for the feature in ZK to propagate the deprecation + * of certain versions. After this happens, any external client that queries the Broker to learn the + * feature versions will at some point start to see the new value for the finalized minVersionLevel + * for the feature. The external clients are expected to stop using the deprecated versions at least + * by the time that they learn about it. + * + * Here is how the above code change needs to be done: + * In order to deprecate feature version levels, in the supportedFeatures map you need to supply a + * specific firstActiveVersion value that's higher than the minVersion for the feature. The + * value for firstActiveVersion should be 1 beyond the highest version that you intend to deprecate + * for that feature. Whenever the controller is elected or the features are finalized via the + * ApiKeys.UPDATE_FEATURES api, the feature version levels in the closed range: + * [minVersion, firstActiveVersion - 1] are automatically deprecated in ZK by the controller logic. + * + * Example: + * - Let us assume the existing finalized feature in ZK: + * { + * "feature" -> FinalizedVersionRange(minVersionLevel=1, maxVersionLevel=5) + * } + * Now, supposing you would like to deprecate feature version levels: [1, 2]. + * Then, in the supportedFeatures map you should supply the following: + * supportedFeatures = { + * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, maxVersion=5) + * } + * - If you do NOT want to deprecate a version level for a feature, then, in the supportedFeatures + * map you should supply the firstActiveVersion to be the same as the minVersion supplied for that + * feature. + * Example: + * supportedFeatures = { + * "feature" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, maxVersion=5) + * } + * The above indicates no intent to deprecate any version levels for the feature. + * + * STEP 2: + * ======= + * + * After the first step is over, you may (at some point) want to permanently remove the code/logic + * for the functionality offered by the deprecated feature versions. This is the second step. Here a + * subsequent major Kafka release is made with another Broker code change that removes the code for + * the functionality offered by the deprecated feature versions. This would completely drop support + * for the deprecated versions. Such a code change needs to be supplemented by supplying a + * suitable higher minVersion value for the feature in the supportedFeatures map. + * Example: + * - In the example above in step 1, we showed how to deprecate version levels [1, 2] for + * "feature". Now let us assume the following finalized feature in ZK (after the deprecation + * has been carried out): + * { + * "feature" -> FinalizedVersionRange(minVersionLevel=3, maxVersionLevel=5) + * } + * Now, supposing you would like to permanently remove support for feature versions: [1, 2]. + * Then, in the supportedFeatures map you should now supply the following: Review comment: Similar here to make `firstActiveVersion = minVersion` as default. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org