-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/#review44998
-----------------------------------------------------------


Overall, the patch looks pretty good to me. Some comments below.


core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/21588/#comment79629>

    There is no need to specify val explicitly. Case class automatically makes 
every parameter in the constructor a val.



core/src/main/scala/kafka/api/ProducerResponse.scala
<https://reviews.apache.org/r/21588/#comment79630>

    Ditto as the above.



core/src/main/scala/kafka/log/FileMessageSet.scala
<https://reviews.apache.org/r/21588/#comment79633>

    Should this be a val instead of a function?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79634>

    Does time need to be val?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79635>

    Hmm, when we get an empty ByteBufferMessage, it may be important to return 
the corresponding offsetMetadata.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79636>

    This can only happen to regular consumer fetch requests. So, returning an 
UnknowOffset is ok since it's not going to be used for purgatory checking. We 
should describe this in the comment.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79637>

    Perhaps this can be named as convertToOffsetMetadata()?



core/src/main/scala/kafka/server/KafkaServer.scala
<https://reviews.apache.org/r/21588/#comment79640>

    Could this be done inside KafkaApis?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79642>

    Could we make this a case class? Then equal() doesn't need to be 
overwritten.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79641>

    older() and peer() could be named better. How about offsetOnOlderSegment 
and offsetOnSameSegment?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79643>

    Maybe postitionDiff()?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/21588/#comment79644>

    Do we need to change to override val?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/21588/#comment79645>

    We will need to sync on t when doing the check (same as in line 187) to 
avoid the race condition that can cause two responses to be sent for the same 
request.


- Jun Rao


On June 6, 2014, 12:41 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21588/
> -----------------------------------------------------------
> 
> (Updated June 6, 2014, 12:41 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. change the watch() API to checkAndMaybeWatch(). In that function, 
> purgatory will try to add the delayed request to each keyed watchers list.
> 
> a). When the watcher is trying to add the delayed request, it first check if 
> it is already satisified, and only add the request if it is not satisfied yet.
> 
> b). If one of the watchers failed to add the request since it is already 
> satisfied, checkAndMaybeWatch() returns immediately.
> 
> c). The purgatory size gauge now is the watcher lists' size plus the delayed 
> queue size.
> 
> 2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) 
> Segment file base offset, c) Relative physical position in segment file.
> 
> Each replica then maintains the log offset metadata for
> 
> a) current HW offset. 
> 
> On leader replica, the metadata includes all three values; on follower 
> replica, the metadata only keeps the message offset (others are just -1).
> 
> When a partition becomes the leader, it will use its HW message offset to 
> construct other two values of the metadata by searching in its logs.
> 
> HW offset will be updated in partition's maybeUpdateLeaderHW function.
> 
> b) current log end offset.
> 
> All replica maintains its own log end offset, which gets updated upon log 
> append.
> 
> The leader replica also maintain other replica's log end offset metadata, 
> which are updated from the follower fetch request.
> 
> 3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of 
> the server-side refactoring. 
> 
> The log.read function now returns the fetch offset metadata along with the 
> message set read.
> 
> 4. The delayed fetch request then maintains for each of its fetching 
> partitions the fetch log offset metadata, which is retrieved from the 
> readMessageSet() call.
> 
> 5. Delayed fetch request's satisfaction criterion now is:
> 
> a). This broker is no longer the leader for ANY of the partitions it tries to 
> fetch
> b). The fetch offset locates not on the fetchable segment of the log
> c). The accumulated bytes from all the fetchable segments exceeds the minimum 
> bytes
> 
> For follower fetch request, the fetchable segment is the log active segment; 
> for consumer fetch request, the fetchable segment is the HW's corresponding 
> segment.
> 
> Checking of Case B/C uses the fetching log offset metadata stored in the 
> delayed fetch.
> 
> 6. Delayed producer request's satisfaction criterion remains the same, as 
> when the ACK specified number of logs replicated the data for each partition.
> 
> 7. The condition when to check if delayed produce/fetch can be unblocked now 
> is:
> 
> Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed 
> produce/fetch.
> 
> Whenever the follower's end log offset moves, unblock delayed produce.
> 
> Whenever the local leader append finishes, unblock delayed (follower) fetch.
> 
> 8. In KafkaApis, when checkAndMaybeWatch returns false for delayed 
> produce/fetch, respond immediately.
> 
> In order to let purgatory to respond either after checkAndMaybeWatch returns 
> false, or some delayed request satisfied/expired, it needs to access the 
> request channel to send the response back; this need to be removed after the 
> refactoring completes.
> 
> 9. Move purgatories and delayed requests and request keys and metrics out of 
> KafkaApis as part of the server-side refactoring.
> 
> The replica manager needs to init with these purgatories after KafkaApis are 
> constructed since for now both needs to access them. This needs to be removed 
> after refactoring completes.
> 
> 10. Related test changes, some other comment/logging minor changes.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 
> 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/log/FileMessageSet.scala 
> b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala 
> b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 0b668f230c8556fdf08654ce522a11847d0bf39b 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> c22e51e0412843ec993721ad3230824c0aadd2ba 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/utils/DelayedItem.scala 
> d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/other/kafka/StressTestLog.scala 
> 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
> d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/21588/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to