----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/21588/ -----------------------------------------------------------
(Updated June 6, 2014, 12:41 a.m.) Review request for kafka. Bugs: KAFKA-1430 https://issues.apache.org/jira/browse/KAFKA-1430 Repository: kafka Description (updated) ------- 1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory will try to add the delayed request to each keyed watchers list. a). When the watcher is trying to add the delayed request, it first check if it is already satisified, and only add the request if it is not satisfied yet. b). If one of the watchers failed to add the request since it is already satisfied, checkAndMaybeWatch() returns immediately. c). The purgatory size gauge now is the watcher lists' size plus the delayed queue size. 2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) Segment file base offset, c) Relative physical position in segment file. Each replica then maintains the log offset metadata for a) current HW offset. On leader replica, the metadata includes all three values; on follower replica, the metadata only keeps the message offset (others are just -1). When a partition becomes the leader, it will use its HW message offset to construct other two values of the metadata by searching in its logs. HW offset will be updated in partition's maybeUpdateLeaderHW function. b) current log end offset. All replica maintains its own log end offset, which gets updated upon log append. The leader replica also maintain other replica's log end offset metadata, which are updated from the follower fetch request. 3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of the server-side refactoring. The log.read function now returns the fetch offset metadata along with the message set read. 4. The delayed fetch request then maintains for each of its fetching partitions the fetch log offset metadata, which is retrieved from the readMessageSet() call. 5. Delayed fetch request's satisfaction criterion now is: a). This broker is no longer the leader for ANY of the partitions it tries to fetch b). The fetch offset locates not on the fetchable segment of the log c). The accumulated bytes from all the fetchable segments exceeds the minimum bytes For follower fetch request, the fetchable segment is the log active segment; for consumer fetch request, the fetchable segment is the HW's corresponding segment. Checking of Case B/C uses the fetching log offset metadata stored in the delayed fetch. 6. Delayed producer request's satisfaction criterion remains the same, as when the ACK specified number of logs replicated the data for each partition. 7. The condition when to check if delayed produce/fetch can be unblocked now is: Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed produce/fetch. Whenever the follower's end log offset moves, unblock delayed produce. Whenever the local leader append finishes, unblock delayed (follower) fetch. 8. In KafkaApis, when checkAndMaybeWatch returns false for delayed produce/fetch, respond immediately. In order to let purgatory to respond either after checkAndMaybeWatch returns false, or some delayed request satisfied/expired, it needs to access the request channel to send the response back; this need to be removed after the refactoring completes. 9. Move purgatories and delayed requests and request keys and metrics out of KafkaApis as part of the server-side refactoring. The replica manager needs to init with these purgatories after KafkaApis are constructed since for now both needs to access them. This needs to be removed after refactoring completes. 10. Related test changes, some other comment/logging minor changes. Diffs ----- core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b Diff: https://reviews.apache.org/r/21588/diff/ Testing ------- Thanks, Guozhang Wang