rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an 
unacked record is found. This is really straightforward, but we could try to do 
better.
   
   We could dequeue all records except those that have a source partition that 
has not been acked. For example, let's say we have enqueue 4 records when 
`commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic 
partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter 
partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the 
offset for these two records to the offset writer, and we'd flush offset 
`partition=P1,offset=O2`. We'd end up with the following remaining in the queue 
(using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   There are quite a few records with source partition `P2` that were acked but 
not dequeued, simply because they were behind an unacked record with a 
different source partition.
   
   However, if we dequeue all acked records with a source partition map that 
does not match a previously un-acked record, then we'd be able to dequeue more 
records and *also* flush offsets 
`partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much 
smaller queue (again, using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets 
closer to what has actually be acked. 
   ```
           // Dequeue all submitted records that have been acknowledged and 
don't have a source partition with an unacked record
           Set<Map<String, ?>> unackedPartitions = new HashSet<>();
           Iterator<SubmittedRecord> iter = submittedRecords.iterator();
           while (iter.hasNext()) {
               SubmittedRecord next = iter.next();
               SourceRecord record = next.record();
               Map<String, ?> partition = record.sourcePartition();
               if (next.isAcknowledged() && 
!unackedPartitions.contains(partition)) {
                   // The record is acknowledged and does not share a source 
partition with an unacknowledged record,
                   // so we can remove it from the queue and write the offsets 
to the offset writer.
                   iter.remove();
                   offsetWriter.offset(partition, record.sourceOffset());
               } else {
                   // As soon as we see an unacknowledged record, we have to 
prevent dequeuing all subsequent records that use that same partition
                   unackedPartitions.add(partition);
               }
           }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to