> On April 7, 2015, 1:28 a.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 362 > > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362> > > > > As you explained offline, the sender does not have access to record > > batches while requests are in flight, but it would be super if we can > > figure out a way to avoid leaking details of batch completion (which is > > currently exclusively in sender) into the RecordAccumulator.
Actually, since the incomplete batches list was introduced when we add the flush() call, we are sort of leaking it to accumulator already before this patch. And I feel it is not that bad to add this list into the accumulator. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79063 ----------------------------------------------------------- On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated March 27, 2015, 11:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > ab263423ff1d33170effb71acdef3fc501fa072a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > e379ac89c9a2fbfe750d6b0dec693b7eabb76204 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 3df450784592b894008e7507b2737f9bb07f7bd2 > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >