AndrewJSchofield commented on code in PR #17322: URL: https://github.com/apache/kafka/pull/17322#discussion_r1801117898
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1058,36 +1084,57 @@ private void maybeCompleteInitialization(CompletableFuture<Void> future) { private AcquiredRecords acquireNewBatchRecords( String memberId, + Iterable<? extends RecordBatch> batches, long firstOffset, - long lastOffset + long lastOffset, + int maxMessages Review Comment: We've been using `maxFetchRecords` so far, so I don't think we should be using `messages` now. `maxRecords` would be better. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1058,36 +1084,57 @@ private void maybeCompleteInitialization(CompletableFuture<Void> future) { private AcquiredRecords acquireNewBatchRecords( String memberId, + Iterable<? extends RecordBatch> batches, long firstOffset, - long lastOffset + long lastOffset, + int maxMessages ) { lock.writeLock().lock(); try { + // If same batch is fetched and previous batch is removed from the cache then we need to + // update the batch first offset to endOffset, only if enfOffset is passed the firstOffset. + // For an initial start of the share fetch from a topic partition the endOffset will be initialized + // to 0 but firstOffset can be higher than 0. + long firstAcquiredOffset = firstOffset; + if (cachedState.isEmpty() && endOffset > firstAcquiredOffset) { + firstAcquiredOffset = endOffset; + } + + // Check how many messages can be acquired from the batch. + long lastAcquiredOffset = lastOffset; + if (maxMessages < (lastAcquiredOffset - firstAcquiredOffset + 1)) { Review Comment: nit: I think you can lose the inner parentheses for readability. ########## share/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; + +import java.util.Collections; +import java.util.List; + +/** + * The ShareAcquiredRecords class is used to send the acquired records and associated metadata. + */ +public class ShareAcquiredRecords { + + public static final ShareAcquiredRecords EMPTY_SHARE_ACQUIRED_RECORDS = new ShareAcquiredRecords(); + + /** + * The list of acquired records. + */ + private final List<AcquiredRecords> records; + /** + * The number of offsets acquired. The acquired records has a first and last offset, and the count + * is the actual number of offsets acquired. + */ + private final int count; + + public ShareAcquiredRecords( + List<AcquiredRecords> records, + int count + ) { + this.records = records; + this.count = count; + } + + private ShareAcquiredRecords() { + this.records = Collections.emptyList(); + this.count = 0; + } + + public List<AcquiredRecords> records() { + return records; + } + + public int count() { + return count; + } + + public static ShareAcquiredRecords emptyShareAcquiredRecords() { Review Comment: I'd just call it `empty()`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java: ########## @@ -79,7 +85,8 @@ public class ShareGroupConfig { .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC) - .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC);; + .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC) + .define(SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG, INT, SHARE_FETCH_MAX_FETCH_RECORDS_DEFAULT, HIGH, SHARE_FETCH_MAX_FETCH_RECORDS_DOC); Review Comment: `defineInternal` ########## share/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; + +import java.util.Collections; +import java.util.List; + +/** + * The ShareAcquiredRecords class is used to send the acquired records and associated metadata. + */ +public class ShareAcquiredRecords { + + public static final ShareAcquiredRecords EMPTY_SHARE_ACQUIRED_RECORDS = new ShareAcquiredRecords(); + + /** + * The list of acquired records. + */ + private final List<AcquiredRecords> records; + /** + * The number of offsets acquired. The acquired records has a first and last offset, and the count + * is the actual number of offsets acquired. + */ + private final int count; + + public ShareAcquiredRecords( + List<AcquiredRecords> records, + int count + ) { + this.records = records; + this.count = count; + } + + private ShareAcquiredRecords() { + this.records = Collections.emptyList(); + this.count = 0; + } + + public List<AcquiredRecords> records() { + return records; + } + + public int count() { + return count; + } + + public static ShareAcquiredRecords emptyShareAcquiredRecords() { + return EMPTY_SHARE_ACQUIRED_RECORDS; + } + + public static ShareAcquiredRecords fromAcquiredRecords(AcquiredRecords acquiredRecords) { + return new ShareAcquiredRecords( + Collections.singletonList(acquiredRecords), Review Comment: `List.of` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org