> On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 104-105 > > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line104> > > > > We probably should call forceComplete() first and only if it returns > > true, run onExpiration().
This came from the original ExpiredOperationReaper.expireNext(). Also the comment on onExpiration says, "Call-back to execute when a delayed operation expires, but before completion." So, I cannot call forceComplete before onExpiration. I think we can do a little refactoring to clean this up later. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 26 > > <https://reviews.apache.org/r/31568/diff/3/?file=912740#file912740line26> > > > > Is Timer too general a name? Should we rename it to sth like > > DelayedOperationTimer? Timer is intended to be a general timer facility. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines > > 55-57 > > <https://reviews.apache.org/r/31568/diff/3/?file=912745#file912745line55> > > > > Actually, where is the logic to not increment the counter on > > reinserting existing tasks? TimerTaskList.add() seems to always increment > > the counter. The constructor call, new TimerTaskEntry(task), removes an existing entry for this time task (if any), and decrements the counter. I will add comments. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 77 > > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line77> > > > > I was initially puzzled on how we synchronize btw add() and > > advanceClock(). Then I realized that the synchronization is actually done > > in the caller in Timer. I was thinking of how to make this part clearer. > > One way is to add the documentation here. Another way is to move the > > read/write lock from Timer to here, and pass in needed data structures like > > delayQueue. Yeah, it is a little puzzling. I went through similar thoughts. The tricky part is that there can be multiple wheels in a timer. Locking at wheel level is also confusing and not good in terms of perforamnce. And I concluded that the locking should be done in a higher leve, i.e., Timer. I will add more comments. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 52 > > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line52> > > > > It would be useful to add that the timing wheel implementation is used > > to optimize the common case when operations are completed before they time > > out. Yes. It is true that the hierarchical timing wheels works especially well when operations are completed before they time out. But, even when everything times out, it still has advantageous if there are many items in the timer. Its insert cost (including reinsert) and delete cost are O(m) and O(1) respectively (m is the number of wheels) while priority queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. Anyway, I will add more comments. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, lines 40-44 > > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line40> > > > > For the hierachical part, let's say that u is 1 and n is 2. If current > > time is c, then the buckets at different levels are: > > > > level buckets > > 1 [c,c] [c+1,c+1] > > 2 [c,c+1] [c+2,c+3] > > 3 [c,c+3] [c+4,c+7] > > > > So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level > > 3 will never be used since those buckets are already covered in the lower > > level. > > > > This seems a bit wasteful. To remove that waste, we could choose to > > statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to > > use the same currernt time as the start time at all levels for simplicity? > > If so, this is probably fine since the larger the n, the less the waste. > > However, it's probably worth documenting that the buckets at different > > levels can overlap? Yes, the code is simpler this way. I'll add more comments. > On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, line 369 > > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line369> > > > > Perhaps we should trigger a purge if the number of total purgible > > operations (instead of the number of unque purgible operations) is more > > than the purgeInternal. This can be estimated as > > watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - > > delayed). Would you explain why that is better? It will trigger a lot more purge calls. And the frequency of calls depends on how many keys each request has. When the average number of keys per operation is large, it is possible to have a case that the total number of watchers exceeds the threshold, but there are only a few distinct operations to remove. It is hard to tune. - Yasuhiro ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78668 ----------------------------------------------------------- On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated April 1, 2015, 8:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >