rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an 
unacked record is found. This is really straightforward, but we could try to do 
better.
   
   We could dequeue all records except those that have a source partition that 
has not been acked. For example, let's say we have enqueue 4 records when 
`commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic 
partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter 
partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the 
offset for these two records to the offset writer, and we'd flush offset 
`partition=P1,offset=O2`. We'd end up with the following remaining in the queue 
(using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   There are quite a few records with source partition `P2` that were acked but 
not dequeued, simply because they were behind an unacked record with a 
different source partition.
   
   However, if we dequeue all acked records with a source partition map that 
does not match a previously un-acked record, then we'd be able to dequeue more 
records and *also* flush offsets 
`partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much 
smaller queue (again, using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets 
closer to what has actually be acked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to