> On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: > > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109 > > <https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line109> > > > > getAndDecrement(sizeFunction.get(e)) > > Jiangjie Qin wrote: > It seems getAndDecrement() does not take argument and will always > decrement by 1.
ah yes you are right > On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote: > > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82 > > <https://reviews.apache.org/r/26755/diff/3/?file=722245#file722245line82> > > > > One significant caveat to this approach (and in the timed variant > > above) is that if a single large element needs to be enqueued it could > > potentially block a number of smaller elements from being enqueued. This > > may be okay in the case of mirror maker though but would make it less > > useful as a generic utility. > > Jiangjie Qin wrote: > I'm not sure why the big put could block small ones... It is possible > that there is a super big item put into the queue and makes the queue to pass > the byte limit by a lot. In that case, all the put will be blocked until a > bunch of small messages are taken out of the queue. But it seems to be the > purpose of having a byte limit for the queue. I looked again. Yes you are right. It should not block smaller puts. Now I'm going to ask the question from the other side of the table: since you are just notifying waiting threads, it is possible for a large put to get starved if there are a lot of smaller puts that get notified earlier. To the best of my knowledge the JVM does not guarantee fairness in unblocking multiple contending threads. Ideally there should be some notion of maximum wait before a put attempt takes priority over others. i.e., these are nuances that may be a compelling reason to make it a specialized utility within MirrorMaker itself since it is not general enough (yet). - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58497 ----------------------------------------------------------- On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/26755/ > ----------------------------------------------------------- > > (Updated Oct. 27, 2014, 6:50 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1706 > https://issues.apache.org/jira/browse/KAFKA-1706 > > > Repository: kafka > > > Description > ------- > > changed arguments name > > > correct typo. > > > Incorporated Joel's comments. Also fixed negative queue size problem. > > > Diffs > ----- > > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/26755/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >