> On April 7, 2015, 1:28 a.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 362 > > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362> > > > > As you explained offline, the sender does not have access to record > > batches while requests are in flight, but it would be super if we can > > figure out a way to avoid leaking details of batch completion (which is > > currently exclusively in sender) into the RecordAccumulator. > > Guozhang Wang wrote: > Actually, since the incomplete batches list was introduced when we add > the flush() call, we are sort of leaking it to accumulator already before > this patch. And I feel it is not that bad to add this list into the > accumulator.
Yes, the incomplete batch set was added for flush(). We may be able to just get the incomplete batches from recorad accumulator and fail them all in the sender. In that case all the batch.done will only be called in sender. But we need to expose accumulator to sender in this case. I actually just found another synchronization problem between Accumulator.close and Accumulator.append. It is possible for a user thread append a message after accumulator is closed. We might miss callback for that last message in that case. > On April 7, 2015, 1:28 a.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 365 > > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line365> > > > > I don't think we should overload InterruptException for this. > > InterruptException is a wrapper around InterruptedException. i.e., after an > > InterruptException the thread should in fact have been interrupted - i.e., > > the interrupt status of the thread should be true (which is not the case > > here). That makes sense. My intention was trying to say the send was interrupted. Maybe it's better to use IllegalStateException. > On April 7, 2015, 1:28 a.m., Joel Koshy wrote: > > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala, line 333 > > <https://reviews.apache.org/r/31850/diff/5/?file=908545#file908545line333> > > > > Can you also add a test for calling close with a non-zero timeout in > > the callback? It is actually tested in line 394. But the comment was not right. I'll modify that. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79063 ----------------------------------------------------------- On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated March 27, 2015, 11:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > ab263423ff1d33170effb71acdef3fc501fa072a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > e379ac89c9a2fbfe750d6b0dec693b7eabb76204 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 3df450784592b894008e7507b2737f9bb07f7bd2 > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >