----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review75742 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment122980> I think so. It is synchronized underlying and idempotent from what I can see. clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java <https://reviews.apache.org/r/31850/#comment122994> Thought about this again. Actually we should not invoke initiateClose in producer.close(timeout) when timeout is set to negative. I'll change the code in close(timeout) and leave this code as is. It is also more clear because from the semantic point of view, forceClose should do everything. core/src/test/scala/integration/kafka/api/ProducerSendTest.scala <https://reviews.apache.org/r/31850/#comment122997> Probably we cannot, because in this case the producer got closed for each iteration. So we have to create a new one. - Jiangjie Qin On March 9, 2015, 7:56 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated March 9, 2015, 7:56 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 7397e565fd865214529ffccadd4222d835ac8110 > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > ed9c63a6679e3aaf83d19fde19268553a4c107c2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 3df450784592b894008e7507b2737f9bb07f7bd2 > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >