----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79379 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128657> joint -> join clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128660> Just to be clear here, I think we can just say "fail any pending send requests". That is equivalent to closing forcefully. The issue is that after this we try to join the sender thread (if called from user thread) so that is not quite closing forcefully. That is actually a graceful close. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128661> so then this becomes: When timeout = 0, this method fails all pending send requests and: <ul> <li> if the method was invoked from the user thread, it will wait for the sender thread to gracefully exit.</li> <li> if the method was invoked from the producer callback, it will return immediately without waiting for the sender thread to exit.</li> clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128662> max -> maximum for _the_ producer _to_ complete _any pending_ send requests. non negative -> non-negative clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128664> Specifying a timeout of zero means do not wait for pending send requests to complete. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128666> This should probably be info clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128665> This should probably be moved to the else block. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128667> Can we make this clearer? e.g., "Proceeding to force close the producer since pending requests could not be completed within timeout {}..." clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31850/#comment128668> Thanks for catching this issue, but can you explain it more clearly in the comment? i.e., "append is atomic to close" does not really make sense and the "last batch is missed" is not fully explained. More importantly, Guozhang found an issue with the locking approach that he can comment on. Also, general comment on the approach: it is slightly weird to see the closeLock in the code. I'm wondering if we really need to bother with it. i.e., sure there may be some futures returned to the client, but once close has been called, the client probably should not bother to call future.get. Perhaps that is not a valid assumption if they check request satisfaction in separate threads. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31850/#comment128671> Similar comments as above. Also, since this is public we should probably still acquire the read lock. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31850/#comment128673> Similar comment as above. Once all accesses of closed are protected by the lock then we should perhaps remove the volatile qualifier. - Joel Koshy On April 8, 2015, 1:18 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated April 8, 2015, 1:18 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Addressed Joel and Guozhang's comments. > > > rebased on trunk > > > Rebase on trunk > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > b91e2c52ed0acb1faa85915097d97bafa28c413a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 9811a2b2b1e9bf1beb301138f7626e12d275a8db > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >