-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58497
-----------------------------------------------------------



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99555>

    I'm wondering if this is specific and nuanced enough to make it entirely 
private to MirrorMaker.scala
    
    OR
    
    if you think it is useful as a generic utility consider putting in 
org.apache.kafka.clients.common.utils



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99550>

    Unused



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99561>

    Can you just place a scaladoc ref to BlockingQueue.offer? and then add a 
few lines describing further how this differs.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99562>

    This can be paraphrased to be simpler:
    
    "An element can be enqueued provided the current size (in number of 
elements) is within the configured capacity and the current size in bytes of 
the queue is within the configured byte capacity. i.e., the element may be 
enqueued even if adding it causes the queue's size in bytes to exceed the byte 
capacity."
    
    Ok, so while I was thinking through the above: is there any benefit to 
having a count-based capacity when you have a byte-based capacity? i.e., why 
not have byte-capacity only?



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99566>

    The var can be avoided if you put false in the catch block.
    
    I think it would be clearer that way as well.
    
    In any event, I actually don't think we need the catch block.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99565>

    s/success/enqueued/



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99564>

    if (enqueued)



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99579>

    I don't think we need to catch this. i.e., most blocking calls are meant to 
throw InterruptedException. E.g., the standard BlockingQueue.put() can throw 
InterruptedException



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99567>

    Similar comment as above wrt scaladoc



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99568>

    Don't really need a var here (if you just say false in the catch block)



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99569>

    One significant caveat to this approach (and in the timed variant above) is 
that if a single large element needs to be enqueued it could potentially block 
a number of smaller elements from being enqueued. This may be okay in the case 
of mirror maker though but would make it less useful as a generic utility.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99578>

    This offer method is blocking. Interestingly the standard 
BlockingQueue.offer is non-blocking. BlockingQueue.put is blocking. We should 
probably stick to that convention.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99580>

    See comment above on whether we need this



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99570>

    Can you clarify what this means?



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99571>

    getAndDecrement(sizeFunction.get(e))



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99572>

    notify should only be called by a thread that is the owner of this object's 
monitor (see the Javadoc)



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99581>

    For consistence with BlockingQueue this can be named take()



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99574>

    getAndDecrement



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99573>

    Same comment as above



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99577>

    Provide a message



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99575>

    addAndDecrement



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/26755/#comment99576>

    Synchronization comment above


- Joel Koshy


On Oct. 15, 2014, 4:28 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26755/
> -----------------------------------------------------------
> 
> (Updated Oct. 15, 2014, 4:28 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1706
>     https://issues.apache.org/jira/browse/KAFKA-1706
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> changed arguments name
> 
> 
> correct typo.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/26755/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to