rhauch edited a comment on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773
My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better. We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called: ``` 1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true} 2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true} 3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false} 4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true} 5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true} 6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false} 7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true} 8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true} 9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true} 10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false} ``` This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions. With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. We'd end up with the following remaining in the queue (using the same record numbers as before): ``` 3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false} 4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true} 5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true} 6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false} 7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true} 8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true} 9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true} 10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false} ``` There are quite a few records with source partition `P2` that were acked but not dequeued, simply because they were behind an unacked record with a different source partition. However, if we dequeue all acked records with a source partition map that does not match a previously un-acked record, then we'd be able to dequeue more records and *also* flush offsets `partition=P1,offset=O2,partition=P2,offset=O7`. We'd end up with a much smaller queue (again, using the same record numbers as before): ``` 3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false} 6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false} 10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false} ``` This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked: ``` // Dequeue all submitted records that have been acknowledged and don't have a source partition with an unacknowledged record Set<Map<String, ?>> unackedPartitions = new HashSet<>(); while (!submittedRecords.isEmpty()) { SubmittedRecord next = submittedRecords.peek(); SourceRecord record = next.record(); Map<String, ?> partition = record.sourcePartition(); if (next.isAcknowledged() && !unackedPartitions.contains(partition)) { submittedRecords.poll(); // The record is acknowledged, so add the offsets to the offset writer // Offsets are converted & serialized in the OffsetWriter offsetWriter.offset(partition, record.sourceOffset()); } else { // As soon as we see an unacknowledged record, we have to prevent dequeuing all subsequent records that use that same partition unackedPartitions.add(partition); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org