> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since 
> > their expiration time will always < bucket expiration + ticketMs, i.e. the 
> > returned bucket from the delayed queue has already expired all its items. 
> > In this case, could we just call foreach(submit) on all of them instead of 
> > trying to reinsurt them?

It is true only for the lowest wheel. Reinsert is necessary to make timing 
wheels work. A bucket from a higher wheel may contain tasks not expired (a tick 
time is longer in a higher wheel).


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> >     We need to make tickMs and wheelSize configurable.

What is the motivation? I don't think it is a good idea to allow users to 
configure them.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 253
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line253>
> >
> >     Does it require to sync on refQueue as well?

I don't think so.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288>
> >
> >     It may be useful to return #.purged items?

What is the use?


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala, line 67
> > <https://reviews.apache.org/r/31568/diff/1/?file=881362#file881362line67>
> >
> >     latch.await(0, TimeUnit.SECONDS)?

This is to avoid a race condition. In Timer, tasks are run by a thread pool. 
"0" makes it more vulnerable. "3" makes it pretty safe.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 245
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line245>
> >
> >     It seems we do not need to keep this as a class member variable, but 
> > just compute the value in purge() on-the-fly every time.

This is a shared counter updated at multiple places. We need to have this to 
avoid unnecessary purge calls.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout 
> > its life time; even when the task entry gets reinsurted its correspondence 
> > to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the 
> > constructor of the task entry.

This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a 
Timer, then enqueued to a Timer.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines 
> > 29-33
> > <https://reviews.apache.org/r/31568/diff/1/?file=881361#file881361line29>
> >
> >     Could we just add an atomic integer recording the list size and size() 
> > function to TimerTaskList?

We size the list only in this test. Adding a counter to an individual list is 
unnecessary overhead.


- Yasuhiro


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>

Reply via email to