> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68 > > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68> > > > > I think bucket.flush(reinsurt) will always fail on all the items since > > their expiration time will always < bucket expiration + ticketMs, i.e. the > > returned bucket from the delayed queue has already expired all its items. > > In this case, could we just call foreach(submit) on all of them instead of > > trying to reinsurt them?
It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel). > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116> > > > > We need to make tickMs and wheelSize configurable. What is the motivation? I don't think it is a good idea to allow users to configure them. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 253 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line253> > > > > Does it require to sync on refQueue as well? I don't think so. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288> > > > > It may be useful to return #.purged items? What is the use? > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala, line 67 > > <https://reviews.apache.org/r/31568/diff/1/?file=881362#file881362line67> > > > > latch.await(0, TimeUnit.SECONDS)? This is to avoid a race condition. In Timer, tasks are run by a thread pool. "0" makes it more vulnerable. "3" makes it pretty safe. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 245 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line245> > > > > It seems we do not need to keep this as a class member variable, but > > just compute the value in purge() on-the-fly every time. This is a shared counter updated at multiple places. We need to have this to avoid unnecessary purge calls. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72 > > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72> > > > > It seems the task entry of the task will only be set once throughout > > its life time; even when the task entry gets reinsurted its correspondence > > to the task will not change, right? > > > > If that is true we can just set the entry for the task in the > > constructor of the task entry. This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines > > 29-33 > > <https://reviews.apache.org/r/31568/diff/1/?file=881361#file881361line29> > > > > Could we just add an atomic integer recording the list size and size() > > function to TimerTaskList? We size the list only in this test. Adding a counter to an individual list is unnecessary overhead. - Yasuhiro ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 ----------------------------------------------------------- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated Feb. 28, 2015, 12:14 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 > core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >