-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------
(Updated June 6, 2014, 12:41 a.m.)
Review request for kafka.
Bugs: KAFKA-1430
https://issues.apache.org/jira/browse/KAFKA-1430
Repository: kafka
Description (updated)
-------
1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory
will try to add the delayed request to each keyed watchers list.
a). When the watcher is trying to add the delayed request, it first check if it
is already satisified, and only add the request if it is not satisfied yet.
b). If one of the watchers failed to add the request since it is already
satisfied, checkAndMaybeWatch() returns immediately.
c). The purgatory size gauge now is the watcher lists' size plus the delayed
queue size.
2. Add a LogOffsetMetadata structure, which contains a) Message offset, b)
Segment file base offset, c) Relative physical position in segment file.
Each replica then maintains the log offset metadata for
a) current HW offset.
On leader replica, the metadata includes all three values; on follower replica,
the metadata only keeps the message offset (others are just -1).
When a partition becomes the leader, it will use its HW message offset to
construct other two values of the metadata by searching in its logs.
HW offset will be updated in partition's maybeUpdateLeaderHW function.
b) current log end offset.
All replica maintains its own log end offset, which gets updated upon log
append.
The leader replica also maintain other replica's log end offset metadata, which
are updated from the follower fetch request.
3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of
the server-side refactoring.
The log.read function now returns the fetch offset metadata along with the
message set read.
4. The delayed fetch request then maintains for each of its fetching partitions
the fetch log offset metadata, which is retrieved from the readMessageSet()
call.
5. Delayed fetch request's satisfaction criterion now is:
a). This broker is no longer the leader for ANY of the partitions it tries to
fetch
b). The fetch offset locates not on the fetchable segment of the log
c). The accumulated bytes from all the fetchable segments exceeds the minimum
bytes
For follower fetch request, the fetchable segment is the log active segment;
for consumer fetch request, the fetchable segment is the HW's corresponding
segment.
Checking of Case B/C uses the fetching log offset metadata stored in the
delayed fetch.
6. Delayed producer request's satisfaction criterion remains the same, as when
the ACK specified number of logs replicated the data for each partition.
7. The condition when to check if delayed produce/fetch can be unblocked now is:
Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed
produce/fetch.
Whenever the follower's end log offset moves, unblock delayed produce.
Whenever the local leader append finishes, unblock delayed (follower) fetch.
8. In KafkaApis, when checkAndMaybeWatch returns false for delayed
produce/fetch, respond immediately.
In order to let purgatory to respond either after checkAndMaybeWatch returns
false, or some delayed request satisfied/expired, it needs to access the
request channel to send the response back; this need to be removed after the
refactoring completes.
9. Move purgatories and delayed requests and request keys and metrics out of
KafkaApis as part of the server-side refactoring.
The replica manager needs to init with these purgatories after KafkaApis are
constructed since for now both needs to access them. This needs to be removed
after refactoring completes.
10. Related test changes, some other comment/logging minor changes.
Diffs
-----
core/src/main/scala/kafka/api/FetchResponse.scala
d117f10f724b09d6deef0df3a138d28fc91aa13a
core/src/main/scala/kafka/api/ProducerResponse.scala
5a1d8015379b1f5d9130d9edca89544ee7dd0039
core/src/main/scala/kafka/cluster/Partition.scala
518d2df5ae702d8c0937e1f9603fd11a54e24be8
core/src/main/scala/kafka/cluster/Replica.scala
5e659b4a5c0256431aecc200a6b914472da9ecf3
core/src/main/scala/kafka/log/FileMessageSet.scala
b2652ddbe2f857028d5980e29a008b2c614694a3
core/src/main/scala/kafka/log/Log.scala
b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12
core/src/main/scala/kafka/log/LogCleaner.scala
2faa196a4dc612bc634d5ff5f5f275d09073f13b
core/src/main/scala/kafka/log/LogSegment.scala
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8
core/src/main/scala/kafka/server/AbstractFetcherThread.scala
3b15254f32252cf824d7a292889ac7662d73ada1
core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION
core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION
core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION
core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION
core/src/main/scala/kafka/server/KafkaApis.scala
0b668f230c8556fdf08654ce522a11847d0bf39b
core/src/main/scala/kafka/server/KafkaServer.scala
c22e51e0412843ec993721ad3230824c0aadd2ba
core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION
core/src/main/scala/kafka/server/OffsetManager.scala
0e22897cd1c7e45c58a61c3c468883611b19116d
core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
75ae1e161769a020a102009df416009bd6710f4a
core/src/main/scala/kafka/server/ReplicaManager.scala
6a56a772c134dbf1e70c1bfe067223009bfdbac8
core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION
core/src/main/scala/kafka/server/RequestPurgatory.scala
3d0ff1e2dbd6a5c3587cffa283db70415af83a7b
core/src/main/scala/kafka/utils/DelayedItem.scala
d7276494072f14f1cdf7d23f755ac32678c5675c
core/src/test/scala/other/kafka/StressTestLog.scala
8fcd068b248688c40e73117dc119fa84cceb95b3
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
9f04bd38be639cde3e7f402845dbe6ae92e87dc2
core/src/test/scala/unit/kafka/log/LogManagerTest.scala
d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3
core/src/test/scala/unit/kafka/log/LogTest.scala
1da1393983d4b20330e7c7f374424edd1b26f2a3
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
558a5d6765a2b2c2fd33dc75ed31873a133d12c9
core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
2cd3a3faf7be2bbc22a892eec78c6e4c05545e18
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
b349fce21e7c33cb5bd9ac80cce0b23c31e87525
core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
4f61f8469df99e02d6ce7aad897d10e158cca8fd
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b
Diff: https://reviews.apache.org/r/21588/diff/
Testing
-------
Thanks,
Guozhang Wang