cmccabe commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r875070799
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -17,32 +17,94 @@
package org.apache.kafka.controller;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
/**
- * A holder class of the local node's supported feature flags.
+ * A holder class of the local node's supported feature flags as well as the
ApiVersions of other nodes.
*/
public class QuorumFeatures {
private final int nodeId;
+ private final ApiVersions apiVersions;
private final Map<String, VersionRange> supportedFeatures;
+ private final List<Integer> quorumNodeIds;
- QuorumFeatures(int nodeId,
- Map<String, VersionRange> supportedFeatures) {
+ QuorumFeatures(
+ int nodeId,
+ ApiVersions apiVersions,
+ Map<String, VersionRange> supportedFeatures,
+ List<Integer> quorumNodeIds
+ ) {
this.nodeId = nodeId;
+ this.apiVersions = apiVersions;
this.supportedFeatures =
Collections.unmodifiableMap(supportedFeatures);
+ this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
}
- public static QuorumFeatures create(int nodeId,
- Map<String, VersionRange>
supportedFeatures) {
- return new QuorumFeatures(nodeId, supportedFeatures);
+ public static QuorumFeatures create(
+ int nodeId,
+ ApiVersions apiVersions,
+ Map<String, VersionRange> supportedFeatures,
+ Collection<Node> quorumNodes
+ ) {
+ List<Integer> nodeIds =
quorumNodes.stream().map(Node::id).collect(Collectors.toList());
+ return new QuorumFeatures(nodeId, apiVersions, supportedFeatures,
nodeIds);
}
public static Map<String, VersionRange> defaultFeatureMap() {
- return Collections.emptyMap();
+ Map<String, VersionRange> features = new HashMap<>(1);
+ features.put(MetadataVersion.FEATURE_NAME,
VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(),
MetadataVersion.latest().featureLevel()));
+ return features;
+ }
+
+ Optional<VersionRange> quorumSupportedFeature(String featureName) {
+ List<VersionRange> supportedVersions = new
ArrayList<>(quorumNodeIds.size());
+ for (int nodeId : quorumNodeIds) {
+ if (nodeId == this.nodeId) {
Review Comment:
What's the rationale for excluding our own `nodeId` here? I guess the idea
is that it won't ever be found in `apiVersions`?
It would be good to have an INFO message if we only have version ranges from
less than N peers (where N = total number of peers)
And, should we return `Optional.empty` if we only have version ranges from
less than half of the peers? Or throw an exception or something? I suppose we
wouldn't expect to be able to commit in this scenario, but...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]