----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29379/#review66709 -----------------------------------------------------------
I think the basic approach in this patch looks sound and should work fine independent of any fixes to how metadata/leader info is retrieved (as discussed in the JIRA). However, it still needs some cleanup and fixes. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/29379/#comment110290> deque is not threadsafe, you'll need a synchronized block. Since both branches of this if now require that and call deque.peekFirst, you might just want to pull that code out into the surrounding block. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/29379/#comment110292> Looks like this is just a leftover setting from development? This should be using this.batchExpirationMs clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/29379/#comment110294> Normally this sequence of batch.done() and deallocate() is called from Sender.completeBatch(), which also calls Sender.sensors.recordErrors() when there was an error, as there was in this case. Any way to rework this so the error can be properly recorded in metrics? clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java <https://reviews.apache.org/r/29379/#comment110295> This would probably be clearer if it was just batchExpirationMs. - Ewen Cheslack-Postava On Dec. 23, 2014, 8:44 p.m., Parth Brahmbhatt wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29379/ > ----------------------------------------------------------- > > (Updated Dec. 23, 2014, 8:44 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1788 > https://issues.apache.org/jira/browse/KAFKA-1788 > > > Repository: kafka > > > Description > ------- > > KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches > for partitions that has no leader do not stay in accumulator forever. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > f61efb35db7e0de590556e6a94a7b5cb850cdae9 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > c15485d1af304ef53691d478f113f332fe67af77 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 2c9932401d573549c40f16fda8c4e3e11309cb85 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > ef2ca65cabe97b909f17b62027a1bb06827e88fe > > Diff: https://reviews.apache.org/r/29379/diff/ > > > Testing > ------- > > Unit test added. > > > Thanks, > > Parth Brahmbhatt > >