adixitconfluent commented on code in PR #16263: URL: https://github.com/apache/kafka/pull/16263#discussion_r1636439195
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.FetchSession; +import kafka.server.ReplicaManager; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ShareSessionNotFoundException; +import org.apache.kafka.common.message.ShareAcknowledgeResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ShareFetchMetadata; +import org.apache.kafka.common.requests.ShareFetchRequest; +import org.apache.kafka.common.requests.ShareFetchResponse; +import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ShareSession; +import org.apache.kafka.server.share.ShareSessionCache; +import org.apache.kafka.server.share.ShareSessionKey; +import org.apache.kafka.storage.internals.log.FetchParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. + * It is responsible for fetching messages from the log and acknowledging the messages. + */ +public class SharePartitionManager implements AutoCloseable { + + private final static Logger log = LoggerFactory.getLogger(SharePartitionManager.class); + + /** + * The partition cache map is used to store the SharePartition objects for each share group topic-partition. + */ + private final Map<SharePartitionKey, SharePartition> partitionCacheMap; + + /** + * The replica manager is used to fetch messages from the log. + */ + private final ReplicaManager replicaManager; + + /** + * The time instance is used to get the current time. + */ + private final Time time; + + /** + * The share session cache stores the share sessions. + */ + private final ShareSessionCache cache; + + /** + * The fetch queue stores the share fetch requests that are waiting to be processed. + */ + private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue; + + /** + * The process fetch queue lock is used to ensure that only one thread is processing the fetch queue at a time. + */ + private final AtomicBoolean processFetchQueueLock; + + /** + * The record lock duration is the time in milliseconds that a record lock is held for. + */ + private final int recordLockDurationMs; + + /** + * The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition. + */ + private final int maxInFlightMessages; + + /** + * The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived. + */ + private final int maxDeliveryCount; + + public SharePartitionManager( + ReplicaManager replicaManager, + Time time, + ShareSessionCache cache, + int recordLockDurationMs, + int maxDeliveryCount, + int maxInFlightMessages + ) { + this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages); + } + + SharePartitionManager( + ReplicaManager replicaManager, + Time time, + ShareSessionCache cache, + Map<SharePartitionKey, SharePartition> partitionCacheMap, + int recordLockDurationMs, + int maxDeliveryCount, + int maxInFlightMessages + ) { + this.replicaManager = replicaManager; + this.time = time; + this.cache = cache; + this.partitionCacheMap = partitionCacheMap; + this.fetchQueue = new ConcurrentLinkedQueue<>(); + this.processFetchQueueLock = new AtomicBoolean(false); + this.recordLockDurationMs = recordLockDurationMs; + this.maxDeliveryCount = maxDeliveryCount; + this.maxInFlightMessages = maxInFlightMessages; + } + + /** + * The fetch messages method is used to fetch messages from the log for the specified topic-partitions. + * The method returns a future that will be completed with the fetched messages. + * + * @param groupId The group id, this is used to identify the share group. + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param fetchParams The fetch parameters from the share fetch request. + * @param topicIdPartitions The topic-partitions to fetch messages for. + * @param partitionMaxBytes The maximum number of bytes to fetch for each partition. + * + * @return A future that will be completed with the fetched messages. + */ + public CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages( + String groupId, + String memberId, + FetchParams fetchParams, + List<TopicIdPartition> topicIdPartitions, + Map<TopicIdPartition, Integer> partitionMaxBytes + ) { + log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", + topicIdPartitions, groupId, fetchParams); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + /** + * The acknowledge method is used to acknowledge the messages that have been fetched. + * The method returns a future that will be completed with the acknowledge response. + * + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param groupId The group id, this is used to identify the share group. + * @param acknowledgeTopics The acknowledge topics and their corresponding acknowledge batches. + * + * @return A future that will be completed with the acknowledge response. + */ + public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> acknowledge( + String memberId, + String groupId, + Map<TopicIdPartition, List<SharePartition.AcknowledgementBatch>> acknowledgeTopics + ) { + log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", + acknowledgeTopics.keySet(), groupId); + + CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + /** + * The release acquired records method is used to release the acquired records for the specified topic-partitions. + * The method returns a future that will be completed with the release response. + * + * @param groupId The group id, this is used to identify the share group. + * @param memberId The member id, generated by the group-coordinator, this is used to identify the client. + * @param topicIdPartitions The topic-partitions to release the acquired records for. + * + * @return A future that will be completed with the release response. + */ + public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> releaseAcquiredRecords( + String groupId, + String memberId, + List<TopicIdPartition> topicIdPartitions + ) { + log.trace("Release acquired records request for topicIdPartitions: {} with groupId: {}", + topicIdPartitions, groupId); + + CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + + return future; + } + + public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, + ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) { + ShareFetchContext context; + // TopicPartition with maxBytes as 0 should not be added in the cachedPartitions + Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>(); + shareFetchData.forEach((tp, sharePartitionData) -> { + if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData); + }); + if (reqMetadata.isFull()) { + ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); + String removedFetchSessionStr; + if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) { + // If the epoch is FINAL_EPOCH, don't try to create a new session. + if (!shareFetchDataWithMaxBytes.isEmpty()) { + throw Errors.INVALID_REQUEST.exception(); + } + context = new FinalContext(); + synchronized (cache) { + if (cache.remove(key) != null) { + removedFetchSessionStr = "Removed share session with key " + key; + log.debug(removedFetchSessionStr); + } + } + } else { + if (cache.remove(key) != null) { + removedFetchSessionStr = "Removed share session with key " + key; + log.debug(removedFetchSessionStr); + } + ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new + ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size()); + shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) -> + cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false))); + ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), + time.milliseconds(), shareFetchDataWithMaxBytes.size(), cachedSharePartitions); + log.debug("Share session context with key {} isSubsequent {} returning {}", responseShareSessionKey, + false, partitionsToLogString(shareFetchDataWithMaxBytes.keySet())); + + context = new ShareSessionContext(reqMetadata, shareFetchDataWithMaxBytes); + log.debug("Created a new ShareSessionContext with {}. A new share session will be started.", + partitionsToLogString(shareFetchDataWithMaxBytes.keySet())); + } + } else { + synchronized (cache) { + ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); + ShareSession shareSession = cache.get(key); + if (shareSession == null) { + log.debug("Share session error for {}: no such share session found", key); + throw Errors.SHARE_SESSION_NOT_FOUND.exception(); + } else { + if (shareSession.epoch != reqMetadata.epoch()) { + log.debug("Share session error for {}: expected epoch {}, but got {} instead", key, + shareSession.epoch, reqMetadata.epoch()); + throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); + } else { + Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> modifiedTopicIdPartitions = shareSession.update( + shareFetchDataWithMaxBytes, toForget); + cache.touch(shareSession, time.milliseconds()); + shareSession.epoch = ShareFetchMetadata.nextEpoch(shareSession.epoch); + log.debug("Created a new ShareSessionContext for session key {}, epoch {}: " + + "added {}, updated {}, removed {}", shareSession.key(), shareSession.epoch, + partitionsToLogString(modifiedTopicIdPartitions.get( + ShareSession.ModifiedTopicIdPartitionType.ADDED)), + partitionsToLogString(modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)), + partitionsToLogString(modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED)) + ); + context = new ShareSessionContext(reqMetadata, shareSession); + } + } + } + } + return context; + } + + private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { + return new ShareSessionKey(groupId, memberId); + } + + String partitionsToLogString(Collection<TopicIdPartition> partitions) { + return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + } + + public List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String groupId, Uuid memberId) { + ShareSessionKey key = shareSessionKey(groupId, memberId); + ShareSession shareSession = cache.get(key); + if (shareSession == null) { + throw new ShareSessionNotFoundException("Share session not found in cache"); + } + List<TopicIdPartition> cachedTopicIdPartitions = new ArrayList<>(); + shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add( + new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition() + )))); + return cachedTopicIdPartitions; + } + + @Override + public void close() throws Exception { + // TODO: Provide Implementation + } + + /** + * The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the + * share group id, the topic id and the partition id. The key is used to store the SharePartition + * objects in the partition cache map. + */ + // Visible for testing + static class SharePartitionKey { + private final String groupId; + private final TopicIdPartition topicIdPartition; + + public SharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { + this.groupId = Objects.requireNonNull(groupId); + this.topicIdPartition = Objects.requireNonNull(topicIdPartition); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, topicIdPartition); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) + return true; + else if (obj == null || getClass() != obj.getClass()) + return false; + else { + SharePartitionKey that = (SharePartitionKey) obj; + return groupId.equals(that.groupId) && Objects.equals(topicIdPartition, that.topicIdPartition); + } + } + } + + /** + * The ShareFetchPartitionData class is used to store the fetch parameters for a share fetch request. + */ + private static class ShareFetchPartitionData { + // TODO: Provide Implementation + } + + // Helper class to return the erroneous partitions and valid partition data + public static class ErroneousAndValidPartitionData { Review Comment: since this is a helper class, i don't see a need to move it outside this class. We can however change it from `public` to `default`. Will that be fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org