rhauch edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910510910


   First of all, thanks for trying to fix this issue, @C0urante.
   
   And thanks for your insight, @hachikuji. I agree that it seems like we 
should not have to block the offset commits until the full batch of records has 
been written to Kafka.
   
   I suspect the current logic was written this way because it's the simplest 
thing to do, given that the source partition map and offset map in the source 
records are opaque, meaning we can't sort them and have to instead rely upon 
the order of the source records returned by the connector. And because the 
producer can make progress writing to some topic partitions while not making 
progress on others, it's possible that some records in a batch are written 
before earlier records in the same batch.
   
   The bottom line is that we have to track offsets that can be committed using 
only the order of the records that were generated by the source task. The 
current logic simply blocks committing offsets until each "batch" of records is 
completely flushed. That way we can commit _all_ of the offsets in the batch 
together, and let the offset writer rely upon ordering to use only the latest 
offset map for each partition map when we tell it to flush.
   
   But, flushing offsets requires synchronization, and the current logic 
switches between the `outstandingMessages` and `outstandingMessagesBacklog` 
buffers to track the "batches" of records that have to complete for offset 
commits. It's really sort of a mess.
   
   @hachikuji wrote:
   > The patch gets around the problem by relaxing `offset.flush.timeout.ms` a 
little bit. Rather than treating expiration of records as a fatal error, we 
continue to allow more time for `outstandingMessages` to be drained. This 
ensures that we do not have to wait for the messages from 
`outstandingMessagesBacklog` which are added while the flush is in progress.
   
   That's my understanding, too. And maybe I don't grasp the subtleties of the 
fix, but it seems like the fix won't necessarily help when a producer is 
_consistently_ slow. In such cases, the `outstandingMessages` will fill with 
the records sent to the producer since the previous commit offsets, and as soon 
as we start committing offsets all records then get added to 
`outstandingMessagesBacklog`. If the producer writes records significantly 
slower than the source task generates them, then `outstandingMessagesBacklog` 
could be larger than `outstandingMessages` by the time the offsets for 
`outstandingMessages` are finally committed, especially if we're blocking 
offset commits even longer with this change. So while we're able to eventually 
commit those first offsets, if the backlog is larger then it will likely take 
longer for the producer to flush those records than it took the producer to 
flush the first batch. The offset commit thread remains blocked for longer and 
longer periods of 
 time.
   
   Fortunately, we do have back pressure to not let this get this too out of 
control: when the producer's buffer fills up, the worker source task's thread 
will block (up to `max.block.ms`) on calls to `producer.send(...)`, and the 
worker source task will retry any sends that fail after that timeout. And since 
this same thread that calls `poll()`, the worker source task will eventually 
slow calls to `poll()`.
   
   But I think we can change how offsets are flushed such that we don't have to 
wait for the producer, and instead we can simply flush the latest offsets for 
records that have been successfully written at that point. We just need a 
different mechanism (other than the two "outstanding" lists and the 
flush-related flags) to track the offsets for the most recently written records.
   
   One way to do that is to use a single concurrent queue that bookkeeps 
records in the same order as generated by the source task, but in a way that 
allows us to track _which_ records have been acked and tolerates those records 
being acked in any order. 
   
   For example, we could replace the `outstandingMessages` and 
`outstandingMessagesBacklog` fields in `WorkerSourceTask` with something like 
this:
   ```
      private final Queue<SubmittedRecord> submittedRecords = new 
ConcurrentLinkedQueue<>();
   ```
   An element is appended to this queue just before the record is sent to the 
producer, and the `SubmittedRecord` class allows us to track which of these 
records has been acknowledged:
   ```
       protected static class SubmittedRecord {
           private final SourceRecord record;
           private final AtomicBoolean acked = new AtomicBoolean();
           public SubmittedRecord(SourceRecord sourceRecord) {
               record = Objects.requireNonNull(sourceRecord);
           }
           public void acknowledge() {
               acked.set(true);
           }
           public boolean isAcknowledged() {
               return acked.get();
           }
           public SourceRecord record() {
               return record;
           }
       }
   
   ```
   and where `acknowledge()` is called from the producer callback and the 
`commitOffsets()` method can safely call `isAcknowledged()` and `record()` from 
the commit thread. The `sendRecords()` method would add a `SubmittedRecord` to 
the end of the queue for each record that will be sent to the producer:
   ```
       private boolean sendRecords() {
           ...
           for (final SourceRecord preTransformRecord : toSend) {
               ...
               SubmittedRecord submittedRecord = new SubmittedRecord(record);
               if (!submittedRecords.offer(submittedRecord)) {
                   // If a blocking queue, then retry using the existing 
mechanism in WorkerSourceTask
                   log.warn("{} Failed to add record to buffer. Backing off 
before retrying", this);
                   toSend = toSend.subList(processed, toSend.size());
                   lastSendFailed = true;
                   counter.retryRemaining();
                   return false;
               }
               ...
   ```
   and then have the producer callback call the `SubmittedRecord.acknowledge()` 
method:
   ```
               try {
                   ...
                   producer.send(
                       producerRecord,
                       (recordMetadata, e) -> {
                           if (e != null) {
                               ...
                           } else {
                               submittedRecord.acknowledge();
                               ...
                           }
                       });
   ```
   This effectively replaces the `outstandingMessages`, 
`outstandingMessagesBacklog` and `flushing` flag, and it simplifying the logic 
in the `sendRecords()` to not have to know which of those to use.
   
   Then here's the big change: in `commitOffsets()`, we can dequeue all acked 
records, then take the snapshot of offsets, and immediately flush offsets 
without waiting for the producer. And by using a concurrent queue, we don't 
even need to synchronize between the `sendRecords()` method adding to the back 
of the queue and the `commitOffsets()` pulling from the front of the queue.
   ```
       public boolean commitOffsets() {
           ...
           // Dequeue all submitted records that have been
           while (!submittedRecords.isEmpty()) {
               SubmittedRecord next = submittedRecords.peek();
               if (!next.isAcknowledged()) {
                   // This record is not yet acknowledge, so we can't continue 
processing any more offsets
                   break;
               }
               submittedRecords.poll();
               // The record is acknowledged, so add the offsets to the offset 
writer
               // Offsets are converted & serialized in the OffsetWriter
               SourceRecord record = next.record();
               offsetWriter.offset(record.sourcePartition(), 
record.sourceOffset());
           }
           ...
           synchronized (this) {
               boolean flushStarted = offsetWriter.beginFlush();
               if (!flushStarted) {
                   ...
               }
           }
   
           // Now we can actually flush the offsets to user storage.
           Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
               if (error != null) {
                   log.error("{} Failed to flush offsets to storage: ", 
WorkerSourceTask.this, error);
               } else {
                   log.trace("{} Finished flushing offsets to storage", 
WorkerSourceTask.this);
               }
           });
           ...
       }
   ```
   
   I've shown the snippet above using a non-blocking queue of unlimited size. I 
think we could do this because the existing WorkerSourceTask logic already 
handles the possibility that the `producer.send(...)` blocks when its buffer is 
full, up to `max.block.ms` before throwing a retriable exception, and then 
retrying the send if needed. Since this happens on the same thread that calls 
`SourceTask.poll()`, this existing logic already has the backpressure that is 
based upon the producer setting and that prevents the source task getting too 
far ahead of the producer.
   
   Alternatively, we could use a blocking queue, but this would require an 
additional worker configuration, which is not ideal and can't be backported.
   
   @C0urante, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to