> On June 26, 2014, 4:57 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 76 > > <https://reviews.apache.org/r/22874/diff/2/?file=617636#file617636line76> > > > > I don't understand this parameter.
Explained above. > On June 26, 2014, 4:57 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 197 > > <https://reviews.apache.org/r/22874/diff/2/?file=617636#file617636line197> > > > > The logic > > this.lingerMs < DEFAULT_PARTITION_EXPIRY_MS > > doesn't make sense to me. Shouldn't it be > > now - batch.createdMs > this.lingerMs Explained above. > On June 26, 2014, 4:57 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 231 > > <https://reviews.apache.org/r/22874/diff/2/?file=617636#file617636line231> > > > > Why does this initialize to a default of 100ms? Why not return the > > observed min? Explained above. > On June 26, 2014, 4:57 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, > > line 60 > > <https://reviews.apache.org/r/22874/diff/2/?file=617636#file617636line60> > > > > I think technically this parameter indicates that the batch is > > complete, not necessarily full, right? That is we also set it to true if > > the batch is incomplete but the linger time has expired I assume... Actually, the caller thread will only wake up the sender when the batch is full, time expiry is handled by the sender itself; the only corner case is that in the ready call, when there is no data in all partitions (i.e. all partition's deque is empty) we still need to return a timeout value, so a default of 100ms is used. Then when the first message is appended into a batch the caller thread need to check if the linger time is smaller than the default value, and if yes wakes up the sender also since it may be selecting with the default value. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22874/#review46715 ----------------------------------------------------------- On June 25, 2014, 11:44 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22874/ > ----------------------------------------------------------- > > (Updated June 25, 2014, 11:44 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1498 > https://issues.apache.org/jira/browse/KAFKA-1498 > > > Repository: kafka > > > Description > ------- > > 1. Use a size limit on the memory records to guard too-large message cases; > 2. Caller thread check partition readiness due to batch size upon append, and > only wake up sender when the appended partition is ready; 3. Sender thread > select time based on the partition readiness timeout and metadata timeout. 4. > Mirror maker to use one blocking queue per producer thread. 5. Other minor > fixes. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 522881c972ca42ff4dfb6237a2db15b625334d7e > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java > 57bc285c20b5af8957bcc5322cd75c021a5af215 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 1ed3c28b436d28381d9402896e32d16f2586c65e > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 6fb5b82dedb48d946d1ac1ec7a535bddfdc693fa > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 6a3cdcc1f2542479f37bc339baca87464c01e84e > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java > 8b4ac0f9a59b4f2e67e48e6d9b0d9fe340f77166 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 5489acac6806b3ae5e6d568d401d5a20c86cac05 > core/src/main/scala/kafka/tools/MirrorMaker.scala > 763839157d9736f15110072bcae93fc7fdc33f55 > > Diff: https://reviews.apache.org/r/22874/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >