----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58497 -----------------------------------------------------------
core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99555> I'm wondering if this is specific and nuanced enough to make it entirely private to MirrorMaker.scala OR if you think it is useful as a generic utility consider putting in org.apache.kafka.clients.common.utils core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99550> Unused core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99561> Can you just place a scaladoc ref to BlockingQueue.offer? and then add a few lines describing further how this differs. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99562> This can be paraphrased to be simpler: "An element can be enqueued provided the current size (in number of elements) is within the configured capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity." Ok, so while I was thinking through the above: is there any benefit to having a count-based capacity when you have a byte-based capacity? i.e., why not have byte-capacity only? core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99566> The var can be avoided if you put false in the catch block. I think it would be clearer that way as well. In any event, I actually don't think we need the catch block. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99565> s/success/enqueued/ core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99564> if (enqueued) core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99579> I don't think we need to catch this. i.e., most blocking calls are meant to throw InterruptedException. E.g., the standard BlockingQueue.put() can throw InterruptedException core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99567> Similar comment as above wrt scaladoc core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99568> Don't really need a var here (if you just say false in the catch block) core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99569> One significant caveat to this approach (and in the timed variant above) is that if a single large element needs to be enqueued it could potentially block a number of smaller elements from being enqueued. This may be okay in the case of mirror maker though but would make it less useful as a generic utility. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99578> This offer method is blocking. Interestingly the standard BlockingQueue.offer is non-blocking. BlockingQueue.put is blocking. We should probably stick to that convention. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99580> See comment above on whether we need this core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99570> Can you clarify what this means? core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99571> getAndDecrement(sizeFunction.get(e)) core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99572> notify should only be called by a thread that is the owner of this object's monitor (see the Javadoc) core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99581> For consistence with BlockingQueue this can be named take() core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99574> getAndDecrement core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99573> Same comment as above core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99577> Provide a message core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99575> addAndDecrement core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala <https://reviews.apache.org/r/26755/#comment99576> Synchronization comment above - Joel Koshy On Oct. 15, 2014, 4:28 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/26755/ > ----------------------------------------------------------- > > (Updated Oct. 15, 2014, 4:28 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1706 > https://issues.apache.org/jira/browse/KAFKA-1706 > > > Repository: kafka > > > Description > ------- > > changed arguments name > > > correct typo. > > > Diffs > ----- > > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/26755/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >