> On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 126 > > <https://reviews.apache.org/r/31568/diff/2/?file=901425#file901425line126> > > > > timeoutTimer is an implementation detail in ExpiredOperationReaper. > > Could we hide it inside ExpiredOperationReaper and expose needed apis?
I feel it is more natural to have timeoutTimer in DelayedOperationPurgatory just like watchersForKey belongs to it. I'd like to make ExpiredOperationReaper just a simple thread to make it easy to understand. To make it clearer I will remove timeoutTimer from the constructor argument of ExpiredOperationReaper. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 199-202 > > <https://reviews.apache.org/r/31568/diff/2/?file=901425#file901425line199> > > > > Do we need to do that or could we just let the expirationReaper handle > > it? There is a race condition. If an operation is completed just before added to the timer, the opration stays in the timer until timeout because it missed the chance to call cancel(). It is better to remove completed operation right away. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 26 > > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line26> > > > > Do you think the defaults for tickMs and wheelSize are good enough that > > we don't need to expose them as configs? The current default seems good. But I want to keep this parameters. Timer is used only by the purgatory right now, but the Timer implementaion is generic and is possible to be used for other purposes. Some may need to use different parameters. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 44-45 > > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line44> > > > > Could we use inLock() where applicable? I don't like to have a closure overhead here. Scala's closure creates a new instance of closure object every time. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 54-55 > > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line54> > > > > Do we need to run this in a separate thread? Could we just run this in > > the expireReaper thread as before? In a timer implementation, this is a commmon practice. It ensures timely expirations regardless of the size of the timer tasks. To be as generic as possible, I would like to use a supplied executor service. If it is really really necessary to execute the operations in expirationReaper, we can write a custom ExecutorService that runs the task in the caller thread. But I don't think it is necessary. it is more advantageous to have a separate thread on multi-core machine. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 61 > > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line61> > > > > Do we need to pass in the timeoutMs or could be just hardcode the 200ms > > in poll()? The timemout is just so that we can shutdown the expiration > > reaper thread since we don't interrupt it. Again, I want to keep this parameter to make useful for other purposes not just the purgatory. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, lines 66-67 > > <https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line66> > > > > Would it be simpler to just handle one poll item and return? The outer > > loop will call this methold again on the next item. Also, poll() can block. > > So the expiration reaper thread may not be able to shut down if the queue > > is empty. advanceClock() should process all expired buckets while holding writeLock. Otherwise, I don't think it can maintain the consistency of the data structure. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 72 > > <https://reviews.apache.org/r/31568/diff/2/?file=901430#file901430line72> > > > > It seems that all wheels at different hierachies always have the same > > startMs. Does that mean that the first bucket in the coarser level wheel is > > never used? At the beginning, yes. But it will be used later because a wheel is a circular buffer. > On April 1, 2015, 6:46 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines > > 55-59 > > <https://reviews.apache.org/r/31568/diff/2/?file=901432#file901432line55> > > > > Not quite sure what this is testing. It's not clear to me why the > > sharedCounter won't increase after add. Perhaps, we can add some comments. It is testing that reinserting the existing tasks doesn't change the task count. I will add comments. - Yasuhiro ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78444 ----------------------------------------------------------- On March 20, 2015, 3:45 p.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated March 20, 2015, 3:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >