adixitconfluent commented on code in PR #16263: URL: https://github.com/apache/kafka/pull/16263#discussion_r1637853613
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part return future; } + public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, + ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) { + ShareFetchContext context; + // TopicPartition with maxBytes as 0 should not be added in the cachedPartitions + Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>(); + shareFetchData.forEach((tp, sharePartitionData) -> { + if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData); + }); + // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a + // new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases. + if (reqMetadata.isFull()) { + ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); + if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) { + // If the epoch is FINAL_EPOCH, don't try to create a new session. + if (!shareFetchDataWithMaxBytes.isEmpty()) { + throw Errors.INVALID_REQUEST.exception(); + } + context = new FinalContext(); + synchronized (cache) { + if (cache.remove(key) != null) { + log.debug("Removed share session with key {}", key); + } + } + } else { + if (cache.remove(key) != null) { + log.debug("Removed share session with key {}", key); + } + ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new + ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size()); + shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) -> + cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false))); + ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), Review Comment: thanks for pointing it out. I have added the handling for null case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org