apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1986980330


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Par
     /**
      * The newContext method is used to create a new share fetch context for 
every share fetch request.
      * @param groupId The group id in the share fetch request.
-     * @param shareFetchData The topic-partitions and their corresponding 
maxBytes data in the share fetch request.
+     * @param shareFetchData The topic-partitions in the share fetch request.
      * @param toForget The topic-partitions to forget present in the share 
fetch request.
      * @param reqMetadata The metadata in the share fetch request.
      * @param isAcknowledgeDataPresent This tells whether the fetch request 
received includes piggybacked acknowledgements or not
      * @return The new share fetch context object
      */
-    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData,
+    public ShareFetchContext newContext(String groupId, List<TopicIdPartition> 
shareFetchData,
                                         List<TopicIdPartition> toForget, 
ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
         ShareFetchContext context;
-        // TopicPartition with maxBytes as 0 should not be added in the 
cachedPartitions
-        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchDataWithMaxBytes = new HashMap<>();
-        shareFetchData.forEach((tp, sharePartitionData) -> {
-            if (sharePartitionData.maxBytes > 0) 
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
-        });
         // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should 
remove the existing sessions. Also, start a
         // new session in case it is INITIAL_EPOCH. Hence, we need to treat 
them as special cases.
         if (reqMetadata.isFull()) {
             ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
             if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
-                // If the epoch is FINAL_EPOCH, don't try to create a new 
session.
-                if (!shareFetchDataWithMaxBytes.isEmpty()) {
-                    throw Errors.INVALID_REQUEST.exception();
-                }

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to