apoorvmittal10 commented on code in PR #19659: URL: https://github.com/apache/kafka/pull/19659#discussion_r2087178807
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -746,6 +747,29 @@ private Consumer<Set<String>> failedShareAcknowledgeMetricsHandler() { }; } + /** + * The handler for share version feature metadata changes. + * @param shareVersion the new share version feature + */ + public void onShareVersionToggle(ShareVersion shareVersion) { + if (!shareVersion.supportsShareGroups()) { + // Remove all share sessions from share session cache. + synchronized (cache) { Review Comment: `removeAllSessions` is already synchornized in `cache`, why do we need synchonized here? ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ########## @@ -232,6 +240,23 @@ class BrokerMetadataPublisher( if (_firstPublish) { finishInitializingReplicaManager() } + + if (delta.featuresDelta != null) { + try { + val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset) + // Share version feature has been toggled. + if (newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) != finalizedShareVersion) { + finalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) Review Comment: `newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)` this has been done twice in consectuvie lines. Can't we have a `val` to record this? ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ########## @@ -232,6 +240,23 @@ class BrokerMetadataPublisher( if (_firstPublish) { finishInitializingReplicaManager() } + + if (delta.featuresDelta != null) { + try { + val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset) + // Share version feature has been toggled. + if (newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) != finalizedShareVersion) { + finalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) + val shareVersion: ShareVersion = ShareVersion.fromFeatureLevel(finalizedShareVersion) + info(s"Feature share.version has been updated to version $finalizedShareVersion") + sharePartitionManager.onShareVersionToggle(shareVersion) + } + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share partition manager " + + s" with share version feature change in $delta", t) Review Comment: Do you need to print complete `delta`? ########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -516,7 +522,8 @@ class BrokerServer( authorizerPlugin.toJava ), sharedServer.initialBrokerMetadataLoadFaultHandler, - sharedServer.metadataPublishingFaultHandler + sharedServer.metadataPublishingFaultHandler, + sharePartitionManager Review Comment: Why not to pass the handler as like already in the method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org