AndrewJSchofield commented on code in PR #19010: URL: https://github.com/apache/kafka/pull/19010#discussion_r1966728162
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -615,13 +615,44 @@ public long nextFetchOffset() { * Acquire the fetched records for the share partition. The acquired records are added to the * in-flight records and the next fetch offset is updated to the next offset that should be * fetched from the leader. + * <p> + * The method always acquire the full batch records. The cache state can consist of multiple + * full batches as a single batch. This behavior is driven by client configurations (batch size + * and max fetch records) and allows for efficient client acknowledgements. However, partial batches + * can exist in the cache only after a leader change and partial acknowledgements have been persisted + * prior leader change. In such case, when a share partition loses track of a batch's start and + * end offsets (e.g., after a leader change and partial acknowledgements), the cache stores the + * batch based on the offset range provided by the persister. This method handles these special + * batches by maintaining this range up to the last offset returned by the persister. No special + * handling is required after wards; the cache will eventually return to managing full batches. + * <p> + * For compacted topics, batches may be non-contiguous, and records within cached batches may contain gaps. + * Because this method operates at the batch level, it acquires entire batches and relies on the + * client to report any gaps in the data. Whether non-contiguous batches are acquired depends on + * the first and last offsets of the fetched batches. Batches outside of this boundary will never + * be acquired. For instance, if fetched batches cover offsets [0-9 and 20-29], and the configured + * batch size and maximum fetch records are large enough (greater than 30 in this example), the + * intervening batch [10-19] will be acquired. Since full fetched batch is acquired, the client is + * responsible for reporting any data gaps. However, if the [0-9] and [20-29] ranges are fetched + * in separate calls to this method, the [10-19] batch will not be acquired and cannot exist in + * the cache. + * <p> + * However, for compacted topics, previously acquired batches (e.g., due to acquisition lock timeout + * or explicit client release) might become available for acquisition again. But subsequent fetches + * may reveal that these batches, or parts of them, have been removed by compaction. Because this + * method works with whole batches, the disappearance of individual offsets within a batch requires + * no special handling; the batch will be re-acquired, and the client will report the gaps. But if + * an entire batch has been compacted away, this method must archive it in the cache to allow the + * Share Partition Start Offset (SPSO) to progress. This is accomplished by comparing the fetchOffset + * (the offset from which the log was read) with the first base offset of the fetch response. Any + * batches from fetchOffset to first base offset of the fetch response are archived. * * @param memberId The member id of the client that is fetching the record. * @param batchSize The number of records per acquired records batch. * @param maxFetchRecords The maximum number of records that should be acquired, this is a soft * limit and the method might acquire more records than the maxFetchRecords, * if the records are already part of the same fetch batch. -* * @param fetchOffset The fetch offset for which the records are fetched. + * @param fetchOffset The fetch offset for which the records are fetched. Review Comment: There's now an out-of-date comment on the declaration of `fetchOffset`. You've now started to use the parameter. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1061,33 +1098,103 @@ void updateCacheAndOffsets(long logStartOffset) { } } + /** + * The method archives the available records in the cached state that are between the fetch offset + * and the base offset of the first fetched batch. This method is required to handle the compacted + * topics where the already fetched batch which is marked re-available, might not result in subsequent + * fetch response from log. Hence, the batches need to be archived to allow the SPSO and next fetch + * offset to progress. + * + * @param fetchOffset The fetch offset. + * @param baseOffset The base offset of the first fetched batch. + */ + private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) { + lock.writeLock().lock(); + try { + // If the fetch happens from within a batch then fetchOffset can be ahead of base offset else + // should be same as baseOffset of the first fetched batch. Otherwise, we might need to archive + // some stale batches. + if (cachedState.isEmpty() || fetchOffset >= baseOffset) { + // No stale batches to archive. + return; + } + + // The fetch offset can exist in the middle of the batch. Hence, find the floor offset + // for the fetch offset and then find the sub-map from the floor offset to the base offset. + long mapFetchOffset = fetchOffset; + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(fetchOffset); + if (floorOffset != null && floorOffset.getValue().lastOffset() >= fetchOffset) { + mapFetchOffset = floorOffset.getKey(); + } + + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(mapFetchOffset, true, baseOffset, false); + if (subMap.isEmpty()) { + // No stale batches to archive. + return; + } + // Though such batches can be removed from the cache, but it is better to archive them so + // that they are never acquired again. + boolean anyRecordArchived = archiveAvailableRecords(fetchOffset, baseOffset, subMap); + // If we have transitioned the state of any batch/offset from AVAILABLE to ARCHIVED, + // then there is a chance that the next fetch offset can change. + if (anyRecordArchived) { + findNextFetchOffset.set(true); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * The method archives the available records in the cached state that are before the log start offset. + * + * @param logStartOffset The log start offset. + * @return A boolean which indicates whether any record is archived or not. + */ private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) { + lock.writeLock().lock(); + try { + return archiveAvailableRecords(startOffset, logStartOffset, cachedState); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * The method archive the available records in the given map that are before the end offset. + * + * @param startOffset The offset from which the available records should be archived. + * @param endOffset The offset before which the available records should be archived. + * @param map The map containing the in-flight records. + * @return A boolean which indicates whether any record is archived or not. + */ + private boolean archiveAvailableRecords(long startOffset, long endOffset, NavigableMap<Long, InFlightBatch> map) { lock.writeLock().lock(); Review Comment: Just observing that all callers already hold the write lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org