-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated May 16, 2014, 10:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory 
will try to add the delayed request to each keyed watchers list.

1.a. When the watcher is trying to add the delayed request, it first check if 
it is already satisified, and only add the request if it is not satisfied yet.

1.b. If one of the watchers failed to add the request since it is already 
satisfied, checkAndMaybeWatch() returns immediately.

1.c. There may be some requests that are watched in some other lists while it 
is already satisfied and removed from the lists it is checked by the key. To 
fix this, add the logic of also purging the watcher list when the expire reaper 
purges the delayed queue. Hence now the logic becomes "when the purgatory has 
too many requests, purge both the queue and the watcher lists".

1.d. The purgatory size gauge now is the watcher lists' size plus the delayed 
queue size.

2. Add the current log segment size into the append info.

3. Replica maintains the following additional information

a) current log active segment's size.
b) current log active segment's base offset.
c) current HW's corresponding segment physical position.
d) current HW's corresponding segment's base offset.

a) and b) will be updated on log.append(), c) and d) will be updated on 
maybeIncrementLeaderHW() by searching in the log with the HW (I have thought 
about optimizing this but cannot come up with a better approach.)

4. The delayed fetch request now maintains for each of its fetching partitions:

a) fetch segment's physical position.
b) fetch segment's base offset.

5. Delayed fetch request's satisfaction criterion now is:

Case A: This broker is no longer the leader for ANY of the partitions it tries 
to fetch
Case B: The fetch offset locates not on the fetchable segment of the log
Case C: The accumulated bytes from all the fetchable segments exceeds the 
minimum bytes

For follower fetch request, the fetchable segment is the log active segment, 
for consumer fetch request, the fetchable segment is the HW's corresponding 
segment.

Checking of Case B/C uses the segment's physical position and base offset 
mentioned above.

6. After HW update, both delayed produce and fetch requests are checked for 
satisfaction; after log.append, fetch requests are checked for satisfaction. 

For these cases:

a) broker make leader
b) leader shrink ISR
 
HW may also be moved but we do not check for delayed fetch requests (Is this 
expected?)

7. When checkAndMaybeWatch returns false for delayed produce/fetch, respond 
immediately.

8. Some other comment/logging minor changes.


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 
518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 
5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala 
b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala 
b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 
2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/KafkaApis.scala 
0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
c064c5c4cf1191335572da8a2caf5f95dce902c1 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
4f61f8469df99e02d6ce7aad897d10e158cca8fd 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang

Reply via email to