[ https://issues.apache.org/jira/browse/KAFKA-12736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Bradstreet updated KAFKA-12736: ------------------------------------- Description: When flush is called a copy of the incomplete batches is made. This means that the full ProducerBatch(s) are held in memory until the flush has completed. For batches where the existing memory pool is used this is not as wasteful as the memory will already be returned to the pool, but for non pool memory it can only be GC'd after the flush has completed. Rather than use copyAll we can make a new array with only the produceFuture(s) and await on those. {code:java} /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { try { for (ProducerBatch batch : this.incomplete.copyAll()) batch.produceFuture.await(); } finally { this.flushesInProgress.decrementAndGet(); } } {code} This may help in cases where the application is already memory constrained and memory usage is slowing progress on completion of the incomplete batches. was: When flush is called a copy of the incomplete batches is made. This means that the full ProducerBatch(s) are held in memory until the flush has completed. For batches where the existing memory pool is used this is not as wasteful as the memory will already be returned to the pool, but for non pool memory it can only be GC'd after the flush has completed. Rather than use copyAll we can make a new array with only the produceFuture(s) and await on those. {code:java} /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { try { for (ProducerBatch batch : this.incomplete.copyAll()) batch.produceFuture.await(); } finally { this.flushesInProgress.decrementAndGet(); } } {code} > KafkaProducer.flush holds onto completed ProducerBatch(s) until flush > completes > ------------------------------------------------------------------------------- > > Key: KAFKA-12736 > URL: https://issues.apache.org/jira/browse/KAFKA-12736 > Project: Kafka > Issue Type: Improvement > Reporter: Lucas Bradstreet > Priority: Minor > > When flush is called a copy of the incomplete batches is made. This means > that the full ProducerBatch(s) are held in memory until the flush has > completed. For batches where the existing memory pool is used this is not as > wasteful as the memory will already be returned to the pool, but for non pool > memory it can only be GC'd after the flush has completed. Rather than use > copyAll we can make a new array with only the produceFuture(s) and await on > those. > > {code:java} > /** > * Mark all partitions as ready to send and block until the send is complete > */ > public void awaitFlushCompletion() throws InterruptedException { > try { > for (ProducerBatch batch : this.incomplete.copyAll()) > batch.produceFuture.await(); > } finally { > this.flushesInProgress.decrementAndGet(); > } > } > {code} > This may help in cases where the application is already memory constrained > and memory usage is slowing progress on completion of the incomplete batches. > -- This message was sent by Atlassian Jira (v8.3.4#803005)