----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78668 -----------------------------------------------------------
Thanks for the new patch. Now that I understood the logic better, I think this is a really smart implementation. A few more comments below. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment128118> We probably should call forceComplete() first and only if it returns true, run onExpiration(). core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment128145> Java executor service by default eats all unhandled exceptions. For easier debugging, we will need to add an UncaughtExceptionHandler and log an error of the exception. See KafkaSchedule.startup() and Utils.newThread(). core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment128129> Perhaps we should trigger a purge if the number of total purgible operations (instead of the number of unque purgible operations) is more than the purgeInternal. This can be estimated as watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed). core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment128139> Is Timer too general a name? Should we rename it to sth like DelayedOperationTimer? core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment128109> For the hierachical part, let's say that u is 1 and n is 2. If current time is c, then the buckets at different levels are: level buckets 1 [c,c] [c+1,c+1] 2 [c,c+1] [c+2,c+3] 3 [c,c+3] [c+4,c+7] So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 will never be used since those buckets are already covered in the lower level. This seems a bit wasteful. To remove that waste, we could choose to statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the same currernt time as the start time at all levels for simplicity? If so, this is probably fine since the larger the n, the less the waste. However, it's probably worth documenting that the buckets at different levels can overlap? core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127579> A overflow => An overflow in a overflow => in an overflow core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127580> moved the finer => moved to the finer core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment128112> It would be useful to add that the timing wheel implementation is used to optimize the common case when operations are completed before they time out. core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment128117> I was initially puzzled on how we synchronize btw add() and advanceClock(). Then I realized that the synchronization is actually done in the caller in Timer. I was thinking of how to make this part clearer. One way is to add the documentation here. Another way is to move the read/write lock from Timer to here, and pass in needed data structures like delayQueue. core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala <https://reviews.apache.org/r/31568/#comment128146> should have 5 => should have 6 core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala <https://reviews.apache.org/r/31568/#comment128148> Actually, where is the logic to not increment the counter on reinserting existing tasks? TimerTaskList.add() seems to always increment the counter. - Jun Rao On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated April 1, 2015, 8:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >