adixitconfluent commented on code in PR #19148: URL: https://github.com/apache/kafka/pull/19148#discussion_r1986736109
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Par /** * The newContext method is used to create a new share fetch context for every share fetch request. * @param groupId The group id in the share fetch request. - * @param shareFetchData The topic-partitions and their corresponding maxBytes data in the share fetch request. + * @param shareFetchData The topic-partitions in the share fetch request. * @param toForget The topic-partitions to forget present in the share fetch request. * @param reqMetadata The metadata in the share fetch request. * @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not * @return The new share fetch context object */ - public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData, + public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData, List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) { ShareFetchContext context; - // TopicPartition with maxBytes as 0 should not be added in the cachedPartitions - Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>(); - shareFetchData.forEach((tp, sharePartitionData) -> { - if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData); - }); // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a // new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases. if (reqMetadata.isFull()) { ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) { - // If the epoch is FINAL_EPOCH, don't try to create a new session. - if (!shareFetchDataWithMaxBytes.isEmpty()) { - throw Errors.INVALID_REQUEST.exception(); - } Review Comment: when we get a `ShareFetch` requests with a `FINAL_EPOCH`, it **can** ideally have some topic partitions only for acknowledgement purpose. In the current code, the `FINAL_EPOCH` request is always a `ShareAcknowledge` request. However, I had a discussion with @AndrewJSchofield, he mentioned that for the future, if any dev decides to have a `ShareFetch` request with a `FINAL_EPOCH`, we should be able to support that. In the current code, if we receive a `ShareFetch` request with some topic partitions, we will be only doing acknowledgements for those topics partitions and we won't be doing any fetch for those topic partitions. I have verified this behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org