-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79379
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128657>

    joint -> join



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128660>

    Just to be clear here, I think we can just say "fail any pending send 
requests". That is equivalent to closing forcefully. The issue is that after 
this we try to join the sender thread (if called from user thread) so that is 
not quite closing forcefully. That is actually a graceful close.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128661>

    so then this becomes:
    
    When timeout = 0, this method fails all pending send requests and:
    <ul>
    <li> if the method was invoked from the user thread, it will wait for the 
sender thread to gracefully exit.</li>
    <li> if the method was invoked from the producer callback, it will return 
immediately without waiting for the sender thread to exit.</li>



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128662>

    max -> maximum
    for _the_ producer _to_ complete _any pending_ send requests.
    non negative -> non-negative



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128664>

    Specifying a timeout of zero means do not wait for pending send requests to 
complete.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128666>

    This should probably be info



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128665>

    This should probably be moved to the else block.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128667>

    Can we make this clearer? e.g., "Proceeding to force close the producer 
since pending requests could not be completed within timeout {}..."



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128668>

    Thanks for catching this issue, but can you explain it more clearly in the 
comment? i.e., "append is atomic to close" does not really make sense and the 
"last batch is missed" is not fully explained. More importantly, Guozhang found 
an issue with the locking approach that he can comment on.
    
    Also, general comment on the approach: it is slightly weird to see the 
closeLock in the code. I'm wondering if we really need to bother with it. i.e., 
sure there may be some futures returned to the client, but once close has been 
called, the client probably should not bother to call future.get. Perhaps that 
is not a valid assumption if they check request satisfaction in separate 
threads.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128671>

    Similar comments as above.
    
    Also, since this is public we should probably still acquire the read lock.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128673>

    Similar comment as above. Once all accesses of closed are protected by the 
lock then we should perhaps remove the volatile qualifier.


- Joel Koshy


On April 8, 2015, 1:18 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 1:18 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
> c2fdc23239bd2196cd912c3d121b591f21393eab 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to