[ https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-12870. ------------------------------------- Resolution: Fixed > RecordAccumulator stuck in a flushing state > ------------------------------------------- > > Key: KAFKA-12870 > URL: https://issues.apache.org/jira/browse/KAFKA-12870 > Project: Kafka > Issue Type: Bug > Components: producer , streams > Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2 > Reporter: Niclas Lockner > Assignee: Jason Gustafson > Priority: Major > Fix For: 3.0.0, 2.8.1 > > Attachments: RecordAccumulator.log, full.log > > > After a Kafka Stream with exactly once enabled has performed its first > commit, the RecordAccumulator within the stream's internal producer gets > stuck in a state where all subsequent ProducerBatches that get allocated are > immediately flushed instead of being held in memory until they expire, > regardless of the stream's linger or batch size config. > This is reproduced in the example code found at > [https://github.com/niclaslockner/kafka-12870] which can be run with > ./gradlew run --args=<bootstrap servers> > The example has a producer that sends 1 record/sec to one topic, and a Kafka > stream with EOS enabled that forwards the records from that topic to another > topic with the configuration linger = 5 sec, commit interval = 10 sec. > > The expected behavior when running the example is that the stream's > ProducerBatches will expire (or get flushed because of the commit) every 5th > second, and that the stream's producer will send a ProduceRequest every 5th > second with an expired ProducerBatch that contains 5 records. > The actual behavior is that the ProducerBatch is made immediately available > for the Sender, and the Sender sends one ProduceRequest for each record. > > The example code contains a copy of the RecordAccumulator class (copied from > kafka-clients 2.8.0) with some additional logging added to > * RecordAccumulator#ready(Cluster, long) > * RecordAccumulator#beginFlush() > * RecordAccumulator#awaitFlushCompletion() > These log entries show (see the attached RecordsAccumulator.log) > * that the batches are considered sendable because a flush is in progress > * that Sender.maybeSendAndPollTransactionalRequest() calls > RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), > and that this makes RecordAccumulator's flushesInProgress jump between 1-2 > instead of the expected 0-1. > > This issue is not reproducible in version 2.3.1 or 2.4.1. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)