----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21588/ -----------------------------------------------------------
(Updated May 16, 2014, 10:55 p.m.) Review request for kafka. Bugs: KAFKA-1430 https://issues.apache.org/jira/browse/KAFKA-1430 Repository: kafka Description (updated) ------- 1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory will try to add the delayed request to each keyed watchers list. 1.a. When the watcher is trying to add the delayed request, it first check if it is already satisified, and only add the request if it is not satisfied yet. 1.b. If one of the watchers failed to add the request since it is already satisfied, checkAndMaybeWatch() returns immediately. 1.c. There may be some requests that are watched in some other lists while it is already satisfied and removed from the lists it is checked by the key. To fix this, add the logic of also purging the watcher list when the expire reaper purges the delayed queue. Hence now the logic becomes "when the purgatory has too many requests, purge both the queue and the watcher lists". 1.d. The purgatory size gauge now is the watcher lists' size plus the delayed queue size. 2. Add the current log segment size into the append info. 3. Replica maintains the following additional information a) current log active segment's size. b) current log active segment's base offset. c) current HW's corresponding segment physical position. d) current HW's corresponding segment's base offset. a) and b) will be updated on log.append(), c) and d) will be updated on maybeIncrementLeaderHW() by searching in the log with the HW (I have thought about optimizing this but cannot come up with a better approach.) 4. The delayed fetch request now maintains for each of its fetching partitions: a) fetch segment's physical position. b) fetch segment's base offset. 5. Delayed fetch request's satisfaction criterion now is: Case A: This broker is no longer the leader for ANY of the partitions it tries to fetch Case B: The fetch offset locates not on the fetchable segment of the log Case C: The accumulated bytes from all the fetchable segments exceeds the minimum bytes For follower fetch request, the fetchable segment is the log active segment, for consumer fetch request, the fetchable segment is the HW's corresponding segment. Checking of Case B/C uses the segment's physical position and base offset mentioned above. 6. After HW update, both delayed produce and fetch requests are checked for satisfaction; after log.append, fetch requests are checked for satisfaction. For these cases: a) broker make leader b) leader shrink ISR HW may also be moved but we do not check for delayed fetch requests (Is this expected?) 7. When checkAndMaybeWatch returns false for delayed produce/fetch, respond immediately. 8. Some other comment/logging minor changes. Diffs ----- core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 core/src/main/scala/kafka/server/RequestPurgatory.scala c064c5c4cf1191335572da8a2caf5f95dce902c1 core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd Diff: https://reviews.apache.org/r/21588/diff/ Testing ------- Thanks, Guozhang Wang