> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187>
> >
> >     TBD
> 
> Guozhang Wang wrote:
>     Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add 
> return Boolean instead of Unit indicating if the task has not expired and 
> successfully added to the timer. And then we can change above as
>     
>     if (!operation.isComplete()) {
>       if (!timeoutTimer.add(operation) {
>         operation.cancel()
>       }
>     }
> 
> Yasuhiro Matsuda wrote:
>     An "expired" task is always successfuly added to the timer and executed 
> immediately. Do you mean "completed" instead of "expired"? TimeTask do not 
> have notion of "completion". I kept the "completion" concept out of the Timer 
> implementation since it is not essential to Timer functionality.

That makes sense. I guess I was just trying to avoid calling 
operation.isComplete consecutively, which may actually not be a bad thing.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since 
> > their expiration time will always < bucket expiration + ticketMs, i.e. the 
> > returned bucket from the delayed queue has already expired all its items. 
> > In this case, could we just call foreach(submit) on all of them instead of 
> > trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
>     It is true only for the lowest wheel. Reinsert is necessary to make 
> timing wheels work. A bucket from a higher wheel may contain tasks not 
> expired (a tick time is longer in a higher wheel).
> 
> Guozhang Wang wrote:
>     OK, I may miss sth. here, but this is my reasoning:
>     
>     The bucket is only returned from delayed queue in line 62 if its 
> expiration time has passed currentTime, after that at least the lowest wheel 
> will advance to its expiration time, and hence add call within the reinsert 
> is doomed to fail as task.expirationTime < wheel's time + tickMs.
> 
> Yasuhiro Matsuda wrote:
>     If the expired bucket is from the lowest wheel, all tasks in the bucket 
> is expired. "reinsert" submits the task to a thread pool for execution.
>     If the expired bucket is from a higher wheel, tasks are either expired or 
> not expired. "reinsert" submits the expired tasks to a thread pool and move 
> unexpired tasks to lower wheels.

Got it.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout 
> > its life time; even when the task entry gets reinsurted its correspondence 
> > to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the 
> > constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently 
> from a Timer, then enqueued to a Timer.
> 
> Guozhang Wang wrote:
>     Yes, but can we move this line to line 119 of TimerTaskList.scala? Then 
> in line 46 of Timer when we create the TimerTaskEntry with the passed in 
> TimerTask its entry field will be set automatically.
> 
> Yasuhiro Matsuda wrote:
>     If a task already enqueued to a timer is enqueued again intentionally or 
> unintentionally (=bug), what happens?
>     My intention here is to keep data structure consistent in such a case. 
> setTimerTaskEntry removes the old entry if exists.

This is true, I was originally confused about whether we ever need to 
re-enqueue, but the previous comment made it clear to me now.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>

Reply via email to