adixitconfluent commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1986736109


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Par
     /**
      * The newContext method is used to create a new share fetch context for 
every share fetch request.
      * @param groupId The group id in the share fetch request.
-     * @param shareFetchData The topic-partitions and their corresponding 
maxBytes data in the share fetch request.
+     * @param shareFetchData The topic-partitions in the share fetch request.
      * @param toForget The topic-partitions to forget present in the share 
fetch request.
      * @param reqMetadata The metadata in the share fetch request.
      * @param isAcknowledgeDataPresent This tells whether the fetch request 
received includes piggybacked acknowledgements or not
      * @return The new share fetch context object
      */
-    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData,
+    public ShareFetchContext newContext(String groupId, List<TopicIdPartition> 
shareFetchData,
                                         List<TopicIdPartition> toForget, 
ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
         ShareFetchContext context;
-        // TopicPartition with maxBytes as 0 should not be added in the 
cachedPartitions
-        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchDataWithMaxBytes = new HashMap<>();
-        shareFetchData.forEach((tp, sharePartitionData) -> {
-            if (sharePartitionData.maxBytes > 0) 
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
-        });
         // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should 
remove the existing sessions. Also, start a
         // new session in case it is INITIAL_EPOCH. Hence, we need to treat 
them as special cases.
         if (reqMetadata.isFull()) {
             ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
             if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
-                // If the epoch is FINAL_EPOCH, don't try to create a new 
session.
-                if (!shareFetchDataWithMaxBytes.isEmpty()) {
-                    throw Errors.INVALID_REQUEST.exception();
-                }

Review Comment:
   when we get a `ShareFetch` requests with a `FINAL_EPOCH`, it **can** ideally 
have some topic partitions only for acknowledgement purpose. 
   In the current code, the `FINAL_EPOCH` request is always a 
`ShareAcknowledge` request. However, I had a discussion with @AndrewJSchofield, 
he mentioned that for the future, if any dev decides to have a `ShareFetch` 
request with a `FINAL_EPOCH`, we should be able to support that. In the current 
code, if we receive a `ShareFetch` request with some topic partitions, we will 
be only doing acknowledgements for those topics partitions and we won't be 
doing any fetch for those topic partitions. I have verified this behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to