> On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > >
Thanks for the review, Jay. Please see the reply below. > On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 530 > > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line530> > > > > I think this section is very confusing. I don't think most people will > > differentiate between immediately exiting vs waiting for 0 ms and then > > exiting, since after all isn't waiting 0 ms the same as immediately exiting. The information we want to deliver here is that when timeout = 0, the behavior would be different depending on the context. i.e. if the method is invoked from user thread, it will try to join sender thread. If it is invoked from sender thread, it won't try join itself - that is what we meant by immediately. > On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, > > line 539 > > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539> > > > > It wouldn't block forever, that isn't correct, it would just block for > > the period of time they specified. We are saying we will call close(0) instead of sender thread call close(timeout). And we do this to *avoid* blocking forever. > On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 157 > > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line157> > > > > I think to be correct the check for whether the producer is closed > > should happen before we consider an append in progress since you loop on > > that check later. Yeah, the current solution is based on an assumption that if a thread received IllegalStateException of producer closed, it won't call send() again. The problem of putting close check before increment appendsInProgress is what if close is invoked from another thread after the close flag check but before incrementing the appendsInProgress value? In this case we might miss this last message or batch. > On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 155 > > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155> > > > > This scheme is clever but non-obvious, is there a simpler way? I'm not sure if there is a simpler way. Maybe we can review the current approach again and see if we can simplify them. The goals we want to achieve here are: 1. When abortImcompleteBatch finishes, no more message should be appended. 2. Make sure when hasUnsent() return false, it does not miss any batch. The current solutions for them both depending on setting close flag first. To achieve (1), the implementation now is setting a close flag first and wait until all on going appends (if any) to finish. To achieve (2), the implementation synchoronizes on the deque. When an append grabs deque lock, it first check if close flag is set or not. If it is set, that means hasUnsent() might have already checked this deque, so it is not safe to append a new batch anymore. Otherwise it is safe to append a new batch. > On April 11, 2015, 8:02 p.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 176 > > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line176> > > > > Can you explain this check? I don't think this actually fixes things as > > the close could happen after the check. Please see previous reply. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79813 ----------------------------------------------------------- On April 10, 2015, 10:09 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated April 10, 2015, 10:09 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Addressed Joel and Guozhang's comments. > > > rebased on trunk > > > Rebase on trunk > > > Addressed Joel's comments. > > > Addressed Joel's comments > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > b91e2c52ed0acb1faa85915097d97bafa28c413a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 9811a2b2b1e9bf1beb301138f7626e12d275a8db > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >