I handle this by synchronizing flush()--only one thread can be flushing at
a time. This isn't too much of a drawback since the first flush will drain
everything anyway the second flush likely won't do too much, so sequencing
them shouldn't hurt too much.

However your idea of using a counter may actually be better and could
possibly remove the synchronization entirely. Let's both think that through
and see if we can think of any corner cases. If not I'll change to that.

-Jay

On Mon, Feb 23, 2015 at 8:46 PM, Jiangjie Qin <becket....@gmail.com> wrote:

>
>
> > On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > > LGTM. Thanks, Jay.
> > > I actually tried just putting a synchronized block around the line
> where we copy the imcomplete set and it seems worked. Maybe we can do that
> if you prefer less code.
> >
> > Jay Kreps wrote:
> >     I think that depends on the lock the add/remove uses in the
> internals of Collections.syncronizedSet which could vary by JVM and
> version. I also think that whenever possible ad hoc synchronization should
> be encapsulated in a small class rather than sprinkled here and there in a
> larger class just so it is easy to verify correctness, even when that is
> slightly more code.
>
> Makes sense. It just occurred to me that current approach might causing a
> flush() wait up to linger.ms.
>
> Imagine there are two threads and with the following sequence:
> 1. thread 1 call flush
> 2. accumulator.flushing = true
> 3. sender thread woke up and did one drain.
> 4. thread 1 started wating on callback 1
> 5. thread 2 call send and followed by a flush
> 6. sender thread finished callback 1 and thread 1 set flushing to false.
> 7. sender thread will not be able to continue to honor the flush for
> thread 2 because flushing flag has been turned off.
> The message sent by thread 2 in step 5 will sitting in accumulator for
> linger.ms and thread 2 will be blocked.
>
> I think we can make the flushing to be an atomic interger instead of
> boolean, so each thread just increment it when begins flush and decrement
> it after flush finishes. As long as flushing > 0 the accumulator should
> flush the data.
>
>
> - Jiangjie
>
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/#review73749
> -----------------------------------------------------------
>
>
> On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> >
> > -----------------------------------------------------------
> > This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/30763/
> > -----------------------------------------------------------
> >
> > (Updated Feb. 24, 2015, 2:31 a.m.)
> >
> >
> > Review request for kafka.
> >
> >
> > Bugs: KAFKA-1865
> >     https://issues.apache.org/jira/browse/KAFKA-1865
> >
> >
> > Repository: kafka
> >
> >
> > Description
> > -------
> >
> > KAFKA-1865 Add a flush() method to the producer.
> >
> >
> > Diffs
> > -----
> >
> >   clients/src/main/java/org/apache/kafka/clients/Metadata.java
> e8afecda956303a6ee116499fd443a54c018e17d
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> 1fd6917c8a5131254c740abad7f7228a47e3628c
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
> 84530f2b948f9abd74203db48707e490dd9c81a5
> >   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
> 17fe541588d462c68c33f6209717cc4015e9b62f
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
> 4a2da41f47994f778109e3c4107ffd90195f0bae
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> ecfe2144d778a5d9b614df5278b9f0a15637f10b
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
> dd0af8aee98abed5d4a0dc50989e37888bb353fe
> >
>  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
> PRE-CREATION
> >   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
> 4ae43ed47e31ad8052b4348a731da11120968508
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
> 75513b0bdd439329c5771d87436ef83fda853bfb
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
> 29c8417422c0cf0d29bf2405c77fd05e35350259
> >
>  
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
> 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
> 558942aaecd1b9f7098435d39aa4b362cd16ff0a
> >   core/src/test/scala/integration/kafka/api/ConsumerTest.scala
> 2802a399bf599e9530f53b7df72f12702a10d3c4
> >   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
> b15237b76def3b234924280fa3fdb25dbb0cc0dc
> >   core/src/test/scala/unit/kafka/utils/TestUtils.scala
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698
> >
> > Diff: https://reviews.apache.org/r/30763/diff/
> >
> >
> > Testing
> > -------
> >
> > New patch addresses feedback. Also (1) comments out the consumer tests
> so I could verify everything else passes and (2) moves some unit tests I
> found that were in the wrong packages.
> >
> >
> > Thanks,
> >
> > Jay Kreps
> >
> >
>
>

Reply via email to