adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2021161644


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -267,8 +268,20 @@ public CompletableFuture<Map<TopicIdPartition, 
PartitionData>> fetchMessages(
             .rotate(topicIdPartitions, new 
PartitionRotateMetadata(sessionEpoch));
 
         CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new 
CompletableFuture<>();
-        processShareFetch(new ShareFetch(fetchParams, groupId, memberId, 
future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, 
brokerTopicStats));
-
+        if (groupConfigManager.groupConfig(groupId).isEmpty()) {
+            processShareFetch(new ShareFetch(fetchParams, groupId, memberId, 
future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, 
brokerTopicStats));
+        } else {
+            FetchParams updatedFetchParams = new FetchParams(
+                fetchParams.replicaId,
+                fetchParams.replicaEpoch,
+                fetchParams.maxWaitMs,
+                fetchParams.minBytes,
+                fetchParams.maxBytes,
+                FetchIsolation.of(-1, 
groupConfigManager.groupConfig(groupId).get().shareIsolationLevel()),

Review Comment:
   @AndrewJSchofield, earlier I had thought to use `isShareFetchRequest` as a 
parameter within `FetchIsolation.of`. That idea did not go well as per 
discussion https://github.com/apache/kafka/pull/19261#discussion_r2012532595. 
Do you think I should add a new variable like below in `FetchRequest.java` or 
`ShareFetchRequest.java`?
   `public static final int SHARE_CONSUMER_ID = -4;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to