huangyiming created KAFKA-10903:
-----------------------------------

             Summary: Optimize producerBatch order performance
                 Key: KAFKA-10903
                 URL: https://issues.apache.org/jira/browse/KAFKA-10903
             Project: Kafka
          Issue Type: Improvement
            Reporter: huangyiming


if we need sort  the producerBatch by sequence,now we use the new batch compare 
with the first batch in deque, and if the first batch in deque is less than the 
new batch,we will loop the deque and let the new batch insert the right 
position.

like this :
{code:java}
// code placeholder
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && 
firstBatchInQueue.baseSequence() < batch.baseSequence()) {
           
            ProducerBatch lastBatchInQueue = deque.peekLast();
            
                List<ProducerBatch> orderedBatches = new ArrayList<>();
                while (deque.peekFirst() != null && 
deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < 
batch.baseSequence())
                    orderedBatches.add(deque.pollFirst());                
log.debug("Reordered incoming batch with sequence {} for partition {}. It was 
placed in the queue at " +
                        "position {}", batch.baseSequence(), 
batch.topicPartition, orderedBatches.size());
                // Either we have reached a point where there are batches 
without a sequence (ie. never been drained
                // and are hence in order by default), or the batch at the 
front of the queue has a sequence greater
                // than the incoming batch. This is the right place to add the 
incoming batch.
                deque.addFirst(batch);                // Now we have to re 
insert the previously queued batches in the right order.
                for (int i = orderedBatches.size() - 1; i >= 0; --i) {
                    deque.addFirst(orderedBatches.get(i));
                }                // At this point, the incoming batch has been 
queued in the correct place according to its sequence.
            }
}{code}
and i think    if we can compare the last producerBatch in the deque,if the new 
batch more than the last producerBatch,just add the new Batch to the last in 
the deque,

like this:
{code:java}
// code placeholder
ProducerBatch lastBatchInQueue = deque.peekLast();
if(lastBatchInQueue !=null && lastBatchInQueue.hasSequence() && 
lastBatchInQueue.baseSequence() <= batch.baseSequence()){
    deque.addLast(batch);
} else {
    List<ProducerBatch> orderedBatches = new ArrayList<>();
    while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && 
deque.peekFirst().baseSequence() < batch.baseSequence())
        orderedBatches.add(deque.pollFirst());

    log.debug("Reordered incoming batch with sequence {} for partition {}. It 
was placed in the queue at " +
            "position {}", batch.baseSequence(), batch.topicPartition, 
orderedBatches.size());
    // Either we have reached a point where there are batches without a 
sequence (ie. never been drained
    // and are hence in order by default), or the batch at the front of the 
queue has a sequence greater
    // than the incoming batch. This is the right place to add the incoming 
batch.
    deque.addFirst(batch);

    // Now we have to re insert the previously queued batches in the right 
order.
    for (int i = orderedBatches.size() - 1; i >= 0; --i) {
        deque.addFirst(orderedBatches.get(i));
    }

    // At this point, the incoming batch has been queued in the correct place 
according to its sequence.
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to