-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:41 a.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory 
will try to add the delayed request to each keyed watchers list.

a). When the watcher is trying to add the delayed request, it first check if it 
is already satisified, and only add the request if it is not satisfied yet.

b). If one of the watchers failed to add the request since it is already 
satisfied, checkAndMaybeWatch() returns immediately.

c). The purgatory size gauge now is the watcher lists' size plus the delayed 
queue size.

2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) 
Segment file base offset, c) Relative physical position in segment file.

Each replica then maintains the log offset metadata for

a) current HW offset. 

On leader replica, the metadata includes all three values; on follower replica, 
the metadata only keeps the message offset (others are just -1).

When a partition becomes the leader, it will use its HW message offset to 
construct other two values of the metadata by searching in its logs.

HW offset will be updated in partition's maybeUpdateLeaderHW function.

b) current log end offset.

All replica maintains its own log end offset, which gets updated upon log 
append.

The leader replica also maintain other replica's log end offset metadata, which 
are updated from the follower fetch request.

3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of 
the server-side refactoring. 

The log.read function now returns the fetch offset metadata along with the 
message set read.

4. The delayed fetch request then maintains for each of its fetching partitions 
the fetch log offset metadata, which is retrieved from the readMessageSet() 
call.

5. Delayed fetch request's satisfaction criterion now is:

a). This broker is no longer the leader for ANY of the partitions it tries to 
fetch
b). The fetch offset locates not on the fetchable segment of the log
c). The accumulated bytes from all the fetchable segments exceeds the minimum 
bytes

For follower fetch request, the fetchable segment is the log active segment; 
for consumer fetch request, the fetchable segment is the HW's corresponding 
segment.

Checking of Case B/C uses the fetching log offset metadata stored in the 
delayed fetch.

6. Delayed producer request's satisfaction criterion remains the same, as when 
the ACK specified number of logs replicated the data for each partition.

7. The condition when to check if delayed produce/fetch can be unblocked now is:

Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed 
produce/fetch.

Whenever the follower's end log offset moves, unblock delayed produce.

Whenever the local leader append finishes, unblock delayed (follower) fetch.

8. In KafkaApis, when checkAndMaybeWatch returns false for delayed 
produce/fetch, respond immediately.

In order to let purgatory to respond either after checkAndMaybeWatch returns 
false, or some delayed request satisfied/expired, it needs to access the 
request channel to send the response back; this need to be removed after the 
refactoring completes.

9. Move purgatories and delayed requests and request keys and metrics out of 
KafkaApis as part of the server-side refactoring.

The replica manager needs to init with these purgatories after KafkaApis are 
constructed since for now both needs to access them. This needs to be removed 
after refactoring completes.

10. Related test changes, some other comment/logging minor changes.


Diffs
-----

  core/src/main/scala/kafka/api/FetchResponse.scala 
d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 
518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 
5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala 
b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala 
b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 
2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala 
c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 
8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang

Reply via email to