jsancio commented on code in PR #19545: URL: https://github.com/apache/kafka/pull/19545#discussion_r2059108995
########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -66,48 +66,48 @@ public static BootstrapMetadata fromVersions( setFeatureLevel(level), (short) 0)); } } - return new BootstrapMetadata(records, metadataVersion, source); + return new BootstrapMetadata(records, metadataVersion.featureLevel(), source); } public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) { List<ApiMessageAndVersion> records = List.of( new ApiMessageAndVersion(new FeatureLevelRecord(). setName(MetadataVersion.FEATURE_NAME). setFeatureLevel(metadataVersion.featureLevel()), (short) 0)); - return new BootstrapMetadata(records, metadataVersion, source); + return new BootstrapMetadata(records, metadataVersion.featureLevel(), source); } public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) { - MetadataVersion metadataVersion = null; + Optional<Short> metadataVersionLevel = Optional.empty(); for (ApiMessageAndVersion record : records) { - Optional<MetadataVersion> version = recordToMetadataVersion(record.message()); - if (version.isPresent()) { - metadataVersion = version.get(); + Optional<Short> level = recordToMetadataVersionLevel(record.message()); + if (level.isPresent()) { + metadataVersionLevel = level; } } - if (metadataVersion == null) { + if (!metadataVersionLevel.isPresent()) { throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME + " was found in the bootstrap metadata from " + source); } - return new BootstrapMetadata(records, metadataVersion, source); + return new BootstrapMetadata(records, metadataVersionLevel.get(), source); } - public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) { + public static Optional<Short> recordToMetadataVersionLevel(ApiMessage record) { if (record instanceof FeatureLevelRecord featureLevel) { if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) { - return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel())); + return Optional.of(featureLevel.featureLevel()); Review Comment: Other records are scram records take a look at KIP-900: https://cwiki.apache.org/confluence/x/f49bDg A new cluster needs to copy all of the records in `bootstrap.checkpoint` to the cluster metadata partition. After a cluster has done that for the first time it doesn't need to read or process the bootstrap checkpoint again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org