-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review49317
-----------------------------------------------------------



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment86280>

    If we want to satisfy the request immediately in this case, shouldn't we 
return true here?



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/23767/#comment86281>

    Since we expose simple consumer as part of the api, this renaming is 
actually an api change. It would be good we keep the old api unchanged.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment86347>

    This doesn't look quite right. When creating Replica, we initialize the 
offset of HW to what's in the checkpoint file. However, the other metadata 
(position and base offset) in LogOffsetMetadata is not initialized properly. 
Would it be better to let Replica take LogOffsetMetadata instead of long for HW?



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/23767/#comment86343>

    It's possible that HW is out of the range of the log. In this case, 
convertToOffsetMetadata() will throw an OffsetOutOfRangeException. When this 
happens, we should probably just set HW to 0 since we are not sure what the HW 
should be.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment86340>

    We need to describe what offset is returned in FetchDataInfo. This will be 
the offset >= startOffset.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment86344>

    We need to describe what offset is returned. This will be the offset >= 
startOffset. Also, need to explain what happens if offset is out of range.



core/src/main/scala/kafka/log/LogSegment.scala
<https://reviews.apache.org/r/23767/#comment86339>

    We need to describe what offset is returned in FetchDataInfo. This will be 
the offset >= startOffset.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment86350>

    If we want to satisfy it immediately, should we return true here?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment86345>

    This is really the offsetDiff. With log compaction, not every offset has a 
message.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment86351>

    Probably use match here too?



core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
<https://reviews.apache.org/r/23767/#comment86349>

    Is that to address the problem that the first message could be delayed due 
to the starting overhead in the consumer fetcher threads? Perhaps we can 
include that in the comment.


- Jun Rao


On July 31, 2014, 10:04 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 31, 2014, 10:04 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments round two: 1. I ended up not adding the read lock on 
> Partition.leaderReplicaIfLocal, since not only the delayed fetch but delayed 
> produce request also needs to call this function in checkSatisfied. On the 
> other hand, reading inconsistently in all corner cases should be harmless, 
> detail explanation is in the JIRA comments; 2. I kept the renaming of 
> PartitionData since it is not used in core, this class will be only used by 
> FetchResponse.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
> f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala 
> b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
> 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
> 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
> af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
> 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 
> 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
> 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
> 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 
> 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
> 9abf219f0efb1a020db9a6623e1672a1affd5cfc 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to