----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 -----------------------------------------------------------
We can probably remove DelayedItem if it is not referenced by anyone any more. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment123994> Add "override" keyword to indicate it is extended from the Runnable. Also, the comments on top seems be referring to a variable, not a function, which is a bit misleading. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124005> We need to make tickMs and wheelSize configurable. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124007> TBD core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124136> It seems we do not need to keep this as a class member variable, but just compute the value in purge() on-the-fly every time. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124140> Does it require to sync on refQueue as well? core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124134> It may be useful to return #.purged items? core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment124135> The threshold should be configurable. core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment124361> I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always < bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them? core/src/main/scala/kafka/utils/timer/TimerTaskList.scala <https://reviews.apache.org/r/31568/#comment124401> It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right? If that is true we can just set the entry for the task in the constructor of the task entry. core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment124359> How about change this comment to: We only need to enqueue the bucket when its expieration time has changed, i.e. the wheel has advanced one cycle and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle will pass in the same value and hence return false, thus the bucket with the same expiration will not be enqueued multiple times. core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala <https://reviews.apache.org/r/31568/#comment124462> Could we just add an atomic integer recording the list size and size() function to TimerTaskList? core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala <https://reviews.apache.org/r/31568/#comment124459> latch.await(0, TimeUnit.SECONDS)? Since this a rather complicated patch (even after reading the wiki page I took quite some time to get through the code), I would suggest adding more comments on the functions / member variables of each classes. - Guozhang Wang On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated Feb. 28, 2015, 12:14 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 > core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >