apoorvmittal10 commented on code in PR #17739:
URL: https://github.com/apache/kafka/pull/17739#discussion_r1842273669


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1602,8 +1602,6 @@ protected void 
updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffset
     protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
         lock.readLock().lock();
         try {
-            if (findNextFetchOffset.get())

Review Comment:
   Thanks @adixitconfluent for confirming. The simplest way I can think of to 
handle common and uncommon case:
   
   - We shall maintain `fetchOffsetMetadata` only corresponding to the end 
offset and not for intermediate states, as that will are most common case i.e. 
fetch from last.
   - The endOffset shall always point to the first offfset of any log batch 
hence if we fetch `fetchOffsetMetadata` for end offset then it should match 
with `fetchOffsetMetadata.messageOffset`.
   
   - DelayedShareFetch arrives with fetchOffset which could match the 
partitions endOffset or anywhere earlier, delayed share fetch doesn't know. 
Hence call `fetchOffsetMetadata()` for share partition which if provides the 
`fetchOffsetMetadata` then check:
           *  if the messageOffset matches with fetchOffset. If yes then we 
already have latest copy.
           * If empty, then we anyways need to readFromLog and update 
`fetchOffsetMetadata`.
           * If it is ahead of fetchOffset then we are fetching for released 
records. Hence we `readFromLog` and keep the `fetchOffsetMetadata` for the 
partition in DelayedShareFetch itself, which should be re-used. I am suggesting 
to keep it locally itself because if timeout occurs for request then anyways we 
will complete the request with whatever we can fetch and there is guaranteed 
that some messages will be acquired. Hence this intermediate 
`fetchOffsetMetadata` is only relevant for single session of DelayedShareFetch 
request. 
        
   - Change `updateFetchOffsetMetadata(Optional<LogOffsetMetadata> 
fetchOffsetMetadata)` in SharePartition to `boolean 
maybeUpdateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata, 
long fetchOffset)` and only keep the `fetchOffsetMetadata` for the endOffset. 
Check if the fetchOffset matches the endOffset.
   - `fetchOffsetMetadata()` should just return stored `fetchOffsetMetadata` 
without any check.
   
   @junrao wdyt? Is there gap in my overall understanding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to