AndrewJSchofield commented on code in PR #16377: URL: https://github.com/apache/kafka/pull/16377#discussion_r1644000033
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -317,6 +364,197 @@ public void close() throws Exception { // TODO: Provide Implementation } + private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { + return new ShareSessionKey(groupId, memberId); + } + + private static String partitionsToLogString(Collection<TopicIdPartition> partitions) { + return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + } + + /** + * Recursive function to process all the fetch requests present inside the fetch queue + */ + private void maybeProcessFetchQueue() { + if (!processFetchQueueLock.compareAndSet(false, true)) { + // The queue is already being processed hence avoid re-triggering. + return; + } + + // Initialize the topic partitions for which the fetch should be attempted. + Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); + ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll(); + try { + assert shareFetchPartitionData != null; + shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition -> { + SharePartitionKey sharePartitionKey = sharePartitionKey( + shareFetchPartitionData.groupId, + topicIdPartition + ); + SharePartition sharePartition = partitionCacheMap.computeIfAbsent(sharePartitionKey, + k -> new SharePartition(shareFetchPartitionData.groupId, topicIdPartition, maxInFlightMessages, maxDeliveryCount, + recordLockDurationMs, timer, time)); + int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0); + // Add the share partition to the list of partitions to be fetched only if we can + // acquire the fetch lock on it. + if (sharePartition.maybeAcquireFetchLock()) { + // Fetching over a topic should be able to proceed if any one of the following 2 conditions are met: + // 1. The fetch is to happen somewhere in between the record states cached in the share partition. + // This is because in this case we don't need to check for the partition limit for in flight messages + // 2. If condition 1 is not true, then that means we will be fetching new records which haven't been cached before. + // In this case it is necessary to check if the partition limit for in flight messages has been reached. + if (sharePartition.nextFetchOffset() != (sharePartition.endOffset() + 1) || sharePartition.canFetchRecords()) { Review Comment: I think this logic would be better encapsulated within the SharePartition class. ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -317,6 +364,197 @@ public void close() throws Exception { // TODO: Provide Implementation } + private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { + return new ShareSessionKey(groupId, memberId); + } + + private static String partitionsToLogString(Collection<TopicIdPartition> partitions) { + return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + } + + /** + * Recursive function to process all the fetch requests present inside the fetch queue + */ + private void maybeProcessFetchQueue() { + if (!processFetchQueueLock.compareAndSet(false, true)) { + // The queue is already being processed hence avoid re-triggering. + return; + } + + // Initialize the topic partitions for which the fetch should be attempted. + Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); + ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll(); Review Comment: I'm not convinced that this will always return an entry. Adding an entry and then calling this method are not synchronized (which is fine) but I expect there is a situation in which the added entry has already been processed before this method is called. ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -317,6 +364,197 @@ public void close() throws Exception { // TODO: Provide Implementation } + private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { + return new ShareSessionKey(groupId, memberId); + } + + private static String partitionsToLogString(Collection<TopicIdPartition> partitions) { + return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + } + + /** + * Recursive function to process all the fetch requests present inside the fetch queue + */ + private void maybeProcessFetchQueue() { + if (!processFetchQueueLock.compareAndSet(false, true)) { + // The queue is already being processed hence avoid re-triggering. + return; + } + + // Initialize the topic partitions for which the fetch should be attempted. + Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); + ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll(); + try { + assert shareFetchPartitionData != null; + shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition -> { + SharePartitionKey sharePartitionKey = sharePartitionKey( + shareFetchPartitionData.groupId, + topicIdPartition + ); + SharePartition sharePartition = partitionCacheMap.computeIfAbsent(sharePartitionKey, + k -> new SharePartition(shareFetchPartitionData.groupId, topicIdPartition, maxInFlightMessages, maxDeliveryCount, + recordLockDurationMs, timer, time)); + int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0); + // Add the share partition to the list of partitions to be fetched only if we can + // acquire the fetch lock on it. + if (sharePartition.maybeAcquireFetchLock()) { + // Fetching over a topic should be able to proceed if any one of the following 2 conditions are met: + // 1. The fetch is to happen somewhere in between the record states cached in the share partition. + // This is because in this case we don't need to check for the partition limit for in flight messages + // 2. If condition 1 is not true, then that means we will be fetching new records which haven't been cached before. + // In this case it is necessary to check if the partition limit for in flight messages has been reached. + if (sharePartition.nextFetchOffset() != (sharePartition.endOffset() + 1) || sharePartition.canFetchRecords()) { + topicPartitionData.put( + topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + sharePartition.nextFetchOffset(), + 0, + partitionMaxBytes, + Optional.empty() + ) + ); + } else { + sharePartition.releaseFetchLock(); + log.info("Record lock partition limit exceeded for SharePartition with key {}, " + + "cannot acquire more records", sharePartitionKey); + } + } + }); + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request and + // will re-fetch for the client in next poll. + shareFetchPartitionData.future.complete(Collections.emptyMap()); + // Though if no partitions can be locked then there must be some other request which + // is in-flight and should release the lock. But it's safe to release the lock as + // the lock on share partition already exists which facilitates correct behaviour + // with multiple requests from queue being processed. + releaseProcessFetchQueueLock(); + return; + } + + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetchPartitionData.groupId, shareFetchPartitionData.fetchParams); + + replicaManager.fetchMessages( + shareFetchPartitionData.fetchParams, + CollectionConverters.asScala( + topicPartitionData.entrySet().stream().map(entry -> + new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) + ), + QuotaFactory.UnboundedQuota$.MODULE$, + responsePartitionData -> { + log.trace("Data successfully retrieved by replica manager: {}", responsePartitionData); + List<Tuple2<TopicIdPartition, FetchPartitionData>> responseData = CollectionConverters.asJava( + responsePartitionData); + processFetchResponse(shareFetchPartitionData, responseData).whenComplete( + (result, throwable) -> { + if (throwable != null) { + log.error("Error processing fetch response for share partitions", throwable); + shareFetchPartitionData.future.completeExceptionally(throwable); + } else { + shareFetchPartitionData.future.complete(result); + } + // Releasing the lock to move ahead with the next request in queue. + releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); Review Comment: Hmm. Doesn't this mean only a single fetch at a time is possible for the entire SharePartitionManager? The fetch queue lock is only released once the replica manager has returned the records. This strikes me as a bad thing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org