> On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote: > > LGTM. Thanks, Jay. > > I actually tried just putting a synchronized block around the line where we > > copy the imcomplete set and it seems worked. Maybe we can do that if you > > prefer less code. > > Jay Kreps wrote: > I think that depends on the lock the add/remove uses in the internals of > Collections.syncronizedSet which could vary by JVM and version. I also think > that whenever possible ad hoc synchronization should be encapsulated in a > small class rather than sprinkled here and there in a larger class just so it > is easy to verify correctness, even when that is slightly more code.
Makes sense. It just occurred to me that current approach might causing a flush() wait up to linger.ms. Imagine there are two threads and with the following sequence: 1. thread 1 call flush 2. accumulator.flushing = true 3. sender thread woke up and did one drain. 4. thread 1 started wating on callback 1 5. thread 2 call send and followed by a flush 6. sender thread finished callback 1 and thread 1 set flushing to false. 7. sender thread will not be able to continue to honor the flush for thread 2 because flushing flag has been turned off. The message sent by thread 2 in step 5 will sitting in accumulator for linger.ms and thread 2 will be blocked. I think we can make the flushing to be an atomic interger instead of boolean, so each thread just increment it when begins flush and decrement it after flush finishes. As long as flushing > 0 the accumulator should flush the data. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30763/#review73749 ----------------------------------------------------------- On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/30763/ > ----------------------------------------------------------- > > (Updated Feb. 24, 2015, 2:31 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1865 > https://issues.apache.org/jira/browse/KAFKA-1865 > > > Repository: kafka > > > Description > ------- > > KAFKA-1865 Add a flush() method to the producer. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/Metadata.java > e8afecda956303a6ee116499fd443a54c018e17d > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1fd6917c8a5131254c740abad7f7228a47e3628c > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 84530f2b948f9abd74203db48707e490dd9c81a5 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 17fe541588d462c68c33f6209717cc4015e9b62f > clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java > 4990692efa6f01c62e1d7b05fbf31bec50e398c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java > 4a2da41f47994f778109e3c4107ffd90195f0bae > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > ecfe2144d778a5d9b614df5278b9f0a15637f10b > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java > d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 > clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java > 4ae43ed47e31ad8052b4348a731da11120968508 > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java > 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 > > clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java > 75513b0bdd439329c5771d87436ef83fda853bfb > > clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java > 29c8417422c0cf0d29bf2405c77fd05e35350259 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 558942aaecd1b9f7098435d39aa4b362cd16ff0a > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 2802a399bf599e9530f53b7df72f12702a10d3c4 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > b15237b76def3b234924280fa3fdb25dbb0cc0dc > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 > > Diff: https://reviews.apache.org/r/30763/diff/ > > > Testing > ------- > > New patch addresses feedback. Also (1) comments out the consumer tests so I > could verify everything else passes and (2) moves some unit tests I found > that were in the wrong packages. > > > Thanks, > > Jay Kreps > >