omkreddy commented on code in PR #16263: URL: https://github.com/apache/kafka/pull/16263#discussion_r1637678486
########## core/src/main/java/kafka/server/share/FinalContext.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ShareFetchResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.LinkedHashMap; + +/** + * The share fetch context for a final share fetch request. + */ +public class FinalContext extends ShareFetchContext { + + private final static Logger log = LoggerFactory.getLogger(FinalContext.class); + + public FinalContext() { + } + + @Override + boolean isTraceEnabled() { + return log.isTraceEnabled(); + } + + @Override + int responseSize(LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates, short version) { + return ShareFetchResponse.sizeOf(version, updates.entrySet().iterator()); + } + + @Override + ShareFetchResponse updateAndGenerateResponseData(String groupId, Uuid memberId, + LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) { + log.debug("Final context returning {}", partitionsToLogString(updates.keySet())); + return new ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0, + updates.entrySet().iterator(), Collections.emptyList())); + } + + @Override + ErroneousAndValidPartitionData getErroneousAndValidTopicIdPartitions() { + return new ErroneousAndValidPartitionData(); + } +} Review Comment: nit: missing new line ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part return future; } + public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, Review Comment: nit: missing javadoc ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -191,6 +208,97 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part return future; } + public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, + ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) { + ShareFetchContext context; + // TopicPartition with maxBytes as 0 should not be added in the cachedPartitions + Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>(); + shareFetchData.forEach((tp, sharePartitionData) -> { + if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData); + }); + // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a + // new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases. + if (reqMetadata.isFull()) { + ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); + if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) { + // If the epoch is FINAL_EPOCH, don't try to create a new session. + if (!shareFetchDataWithMaxBytes.isEmpty()) { + throw Errors.INVALID_REQUEST.exception(); + } + context = new FinalContext(); + synchronized (cache) { + if (cache.remove(key) != null) { + log.debug("Removed share session with key {}", key); + } + } + } else { + if (cache.remove(key) != null) { + log.debug("Removed share session with key {}", key); + } + ImplicitLinkedHashCollection<CachedSharePartition> cachedSharePartitions = new + ImplicitLinkedHashCollection<>(shareFetchDataWithMaxBytes.size()); + shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) -> + cachedSharePartitions.mustAdd(new CachedSharePartition(topicIdPartition, reqData, false))); + ShareSessionKey responseShareSessionKey = cache.maybeCreateSession(groupId, reqMetadata.memberId(), Review Comment: what if responseShareSessionKey is null here? ########## server/src/main/java/org/apache/kafka/server/share/ShareSession.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.requests.ShareFetchRequest; +import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; + +public class ShareSession { + + // Helper enum to return the possible type of modified list of TopicIdPartitions in cache + public enum ModifiedTopicIdPartitionType { + ADDED, + UPDATED, + REMOVED + } + + private final ShareSessionKey key; + private final ImplicitLinkedHashCollection<CachedSharePartition> partitionMap; + private final long creationMs; + + private long lastUsedMs; + // visible for testing + public int epoch; + // This is used by the ShareSessionCache to store the last known size of this session. + // If this is -1, the Session is not in the cache. + private int cachedSize = -1; + + /** + * The share session. + * Each share session is protected by its own lock, which must be taken before mutable + * fields are read or modified. This includes modification of the share session partition map. + * + * @param key The share session key to identify the share session uniquely. + * @param partitionMap The CachedPartitionMap. + * @param creationMs The time in milliseconds when this share session was created. + * @param lastUsedMs The last used time in milliseconds. This should only be updated by + * ShareSessionCache#touch. + * @param epoch The share session sequence number. + */ + public ShareSession(ShareSessionKey key, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, + long creationMs, long lastUsedMs, int epoch) { + this.key = key; + this.partitionMap = partitionMap; + this.creationMs = creationMs; + this.lastUsedMs = lastUsedMs; + this.epoch = epoch; + } + + public ShareSessionKey key() { + return key; + } + + synchronized public int cachedSize() { + return cachedSize; + } + + synchronized public void cachedSize(int size) { + cachedSize = size; + } + + synchronized public long lastUsedMs() { + return lastUsedMs; + } + + synchronized public void lastUsedMs(long ts) { + lastUsedMs = ts; + } + + synchronized public ImplicitLinkedHashCollection<CachedSharePartition> partitionMap() { + return partitionMap; + } + + // Visible for testing + synchronized public int epoch() { + return epoch; + } + + synchronized public int size() { + return partitionMap.size(); + } + + synchronized public Boolean isEmpty() { + return partitionMap.isEmpty(); + } + + synchronized public LastUsedKey lastUsedKey() { + return new LastUsedKey(key, lastUsedMs); + } + + // Visible for testing + synchronized public long creationMs() { + return creationMs; + } + + // Update the cached partition data based on the request. + public Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(Map<TopicIdPartition, Review Comment: Can we make this synchronized method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org