-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
-----------------------------------------------------------


Thanks for the new patch. Now that I understood the logic better, I think this 
is a really smart implementation. A few more comments below.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128118>

    We probably should call forceComplete() first and only if it returns true, 
run onExpiration().



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128145>

    Java executor service by default eats all unhandled exceptions. For easier 
debugging, we will need to add an UncaughtExceptionHandler and log an error of 
the exception. See KafkaSchedule.startup() and Utils.newThread().



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment128129>

    Perhaps we should trigger a purge if the number of total purgible 
operations (instead of the number of unque purgible operations) is more than 
the purgeInternal. This can be estimated as 
watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed).



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment128139>

    Is Timer too general a name? Should we rename it to sth like 
DelayedOperationTimer?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128109>

    For the hierachical part, let's say that u is 1 and n is 2. If current time 
is c, then the buckets at different levels are:
    
    level    buckets
    1        [c,c]   [c+1,c+1]
    2        [c,c+1] [c+2,c+3]
    3        [c,c+3] [c+4,c+7]
    
    So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 
will never be used since those buckets are already covered in the lower level.
    
    This seems a bit wasteful. To remove that waste, we could choose to statt 
the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the 
same currernt time as the start time at all levels for simplicity? If so, this 
is probably fine since the larger the n, the less the waste. However, it's 
probably worth documenting that the buckets at different levels can overlap?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127579>

    A overflow => An overflow
    
    in a overflow => in an overflow



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment127580>

    moved the finer => moved to the finer



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128112>

    It would be useful to add that the timing wheel implementation is used to 
optimize the common case when operations are completed before they time out.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment128117>

    I was initially puzzled on how we synchronize btw add() and advanceClock(). 
Then I realized that the synchronization is actually done in the caller in 
Timer. I was thinking of how to make this part clearer. One way is to add the 
documentation here. Another way is to move the read/write lock from Timer to 
here, and pass in needed data structures like delayQueue.



core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
<https://reviews.apache.org/r/31568/#comment128146>

    should have 5 => should have 6



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment128148>

    Actually, where is the logic to not increment the counter on reinserting 
existing tasks? TimerTaskList.add() seems to always increment the counter.


- Jun Rao


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated April 1, 2015, 8:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>

Reply via email to