-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review66709
-----------------------------------------------------------


I think the basic approach in this patch looks sound and should work fine 
independent of any fixes to how metadata/leader info is retrieved (as discussed 
in the JIRA). However, it still needs some cleanup and fixes.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110290>

    deque is not threadsafe, you'll need a synchronized block. Since both 
branches of this if now require that and call deque.peekFirst, you might just 
want to pull that code out into the surrounding block.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110292>

    Looks like this is just a leftover setting from development? This should be 
using this.batchExpirationMs



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110294>

    Normally this sequence of batch.done() and deallocate() is called from 
Sender.completeBatch(), which also calls Sender.sensors.recordErrors() when 
there was an error, as there was in this case. Any way to rework this so the 
error can be properly recorded in metrics?



clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
<https://reviews.apache.org/r/29379/#comment110295>

    This would probably be clearer if it was just batchExpirationMs.


- Ewen Cheslack-Postava


On Dec. 23, 2014, 8:44 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2014, 8:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches 
> for partitions that has no leader do not stay in accumulator forever.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  c15485d1af304ef53691d478f113f332fe67af77 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>

Reply via email to