> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116> > > > > We need to make tickMs and wheelSize configurable. > > Yasuhiro Matsuda wrote: > What is the motivation? I don't think it is a good idea to allow users to > configure them.
I am not concerning about user-configurability. The purgatory is used by multiple request types: produce, fetch and in the future rebalance, heartbeat and join group, different request type may need to set the tickMs and wheelSize differently. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187> > > > > TBD Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as if (!operation.isComplete()) { if (!timeoutTimer.add(operation) { operation.cancel() } } > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288> > > > > It may be useful to return #.purged items? > > Yasuhiro Matsuda wrote: > What is the use? At line 316 / 317 we could log on trace level whether the clock advance expired any tasks and the #.purged items. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68 > > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68> > > > > I think bucket.flush(reinsurt) will always fail on all the items since > > their expiration time will always < bucket expiration + ticketMs, i.e. the > > returned bucket from the delayed queue has already expired all its items. > > In this case, could we just call foreach(submit) on all of them instead of > > trying to reinsurt them? > > Yasuhiro Matsuda wrote: > It is true only for the lowest wheel. Reinsert is necessary to make > timing wheels work. A bucket from a higher wheel may contain tasks not > expired (a tick time is longer in a higher wheel). OK, I may miss sth. here, but this is my reasoning: The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime < wheel's time + tickMs. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72 > > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72> > > > > It seems the task entry of the task will only be set once throughout > > its life time; even when the task entry gets reinsurted its correspondence > > to the task will not change, right? > > > > If that is true we can just set the entry for the task in the > > constructor of the task entry. > > Yasuhiro Matsuda wrote: > This sets TimerTaskEntry to TimerTask. TimeTask is created independently > from a Timer, then enqueued to a Timer. Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 ----------------------------------------------------------- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated Feb. 28, 2015, 12:14 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 > core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >