> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192 > > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187> > > > > TBD > > Guozhang Wang wrote: > Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add > return Boolean instead of Unit indicating if the task has not expired and > successfully added to the timer. And then we can change above as > > if (!operation.isComplete()) { > if (!timeoutTimer.add(operation) { > operation.cancel() > } > } > > Yasuhiro Matsuda wrote: > An "expired" task is always successfuly added to the timer and executed > immediately. Do you mean "completed" instead of "expired"? TimeTask do not > have notion of "completion". I kept the "completion" concept out of the Timer > implementation since it is not essential to Timer functionality.
That makes sense. I guess I was just trying to avoid calling operation.isComplete consecutively, which may actually not be a bad thing. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68 > > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68> > > > > I think bucket.flush(reinsurt) will always fail on all the items since > > their expiration time will always < bucket expiration + ticketMs, i.e. the > > returned bucket from the delayed queue has already expired all its items. > > In this case, could we just call foreach(submit) on all of them instead of > > trying to reinsurt them? > > Yasuhiro Matsuda wrote: > It is true only for the lowest wheel. Reinsert is necessary to make > timing wheels work. A bucket from a higher wheel may contain tasks not > expired (a tick time is longer in a higher wheel). > > Guozhang Wang wrote: > OK, I may miss sth. here, but this is my reasoning: > > The bucket is only returned from delayed queue in line 62 if its > expiration time has passed currentTime, after that at least the lowest wheel > will advance to its expiration time, and hence add call within the reinsert > is doomed to fail as task.expirationTime < wheel's time + tickMs. > > Yasuhiro Matsuda wrote: > If the expired bucket is from the lowest wheel, all tasks in the bucket > is expired. "reinsert" submits the task to a thread pool for execution. > If the expired bucket is from a higher wheel, tasks are either expired or > not expired. "reinsert" submits the expired tasks to a thread pool and move > unexpired tasks to lower wheels. Got it. > On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72 > > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72> > > > > It seems the task entry of the task will only be set once throughout > > its life time; even when the task entry gets reinsurted its correspondence > > to the task will not change, right? > > > > If that is true we can just set the entry for the task in the > > constructor of the task entry. > > Yasuhiro Matsuda wrote: > This sets TimerTaskEntry to TimerTask. TimeTask is created independently > from a Timer, then enqueued to a Timer. > > Guozhang Wang wrote: > Yes, but can we move this line to line 119 of TimerTaskList.scala? Then > in line 46 of Timer when we create the TimerTaskEntry with the passed in > TimerTask its entry field will be set automatically. > > Yasuhiro Matsuda wrote: > If a task already enqueued to a timer is enqueued again intentionally or > unintentionally (=bug), what happens? > My intention here is to keep data structure consistent in such a case. > setTimerTaskEntry removes the old entry if exists. This is true, I was originally confused about whether we ever need to re-enqueue, but the previous comment made it clear to me now. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 ----------------------------------------------------------- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated Feb. 28, 2015, 12:14 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 > core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >