> On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/api/FetchResponse.scala, lines 28-29 > > <https://reviews.apache.org/r/23767/diff/5/?file=647101#file647101line28> > > > > Since we expose simple consumer as part of the api, this renaming is > > actually an api change. It would be good we keep the old api unchanged.
I agree that simple consumer and fetch responses are exposed. But I am not sure if this is exposed as well, let's discuss this offline. > On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/cluster/Partition.scala, lines 101-102 > > <https://reviews.apache.org/r/23767/diff/5/?file=647102#file647102line101> > > > > This doesn't look quite right. When creating Replica, we initialize the > > offset of HW to what's in the checkpoint file. However, the other metadata > > (position and base offset) in LogOffsetMetadata is not initialized > > properly. Would it be better to let Replica take LogOffsetMetadata instead > > of long for HW? This is actually fine, only leader replica needs the full HW metadata, which is constructed in Partition.makeLeader(). For other replicas just the message offset is sufficient. > On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/cluster/Replica.scala, lines 103-105 > > <https://reviews.apache.org/r/23767/diff/5/?file=647103#file647103line103> > > > > It's possible that HW is out of the range of the log. In this case, > > convertToOffsetMetadata() will throw an OffsetOutOfRangeException. When > > this happens, we should probably just set HW to 0 since we are not sure > > what the HW should be. Thinking about it, convertToOffsetMetadata() will never return LogOffsetMetadata.UnknownOffsetMetadata before. So we can catch the convertToOffsetMetadata and return UnknownOffsetMetadata, which is the same as setting HW to -1. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23767/#review49317 ----------------------------------------------------------- On July 31, 2014, 10:04 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23767/ > ----------------------------------------------------------- > > (Updated July 31, 2014, 10:04 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1430 > https://issues.apache.org/jira/browse/KAFKA-1430 > > > Repository: kafka > > > Description > ------- > > Address Jun's comments round two: 1. I ended up not adding the read lock on > Partition.leaderReplicaIfLocal, since not only the delayed fetch but delayed > produce request also needs to call this function in checkSatisfied. On the > other hand, reading inconsistently in all corner cases should be harmless, > detail explanation is in the JIRA comments; 2. I kept the renaming of > PartitionData since it is not used in core, this class will be only used by > FetchResponse. > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchRequest.scala > 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 > core/src/main/scala/kafka/api/FetchResponse.scala > d117f10f724b09d6deef0df3a138d28fc91aa13a > core/src/main/scala/kafka/cluster/Partition.scala > 134aef9c88068443d4d465189f376dd78605b4f8 > core/src/main/scala/kafka/cluster/Replica.scala > 5e659b4a5c0256431aecc200a6b914472da9ecf3 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > f8c1b4e674f7515c377c6c30d212130f1ff022dd > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 0e64632210385ef63c2ad3445b55ac4f37a63df2 > core/src/main/scala/kafka/log/Log.scala > b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 > core/src/main/scala/kafka/log/LogCleaner.scala > afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 > core/src/main/scala/kafka/log/LogSegment.scala > 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 3b15254f32252cf824d7a292889ac7662d73ada1 > core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION > core/src/main/scala/kafka/server/KafkaApis.scala > fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 > core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION > core/src/main/scala/kafka/server/OffsetManager.scala > 0e22897cd1c7e45c58a61c3c468883611b19116d > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > PRE-CREATION > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 75ae1e161769a020a102009df416009bd6710f4a > core/src/main/scala/kafka/server/ReplicaManager.scala > 897783cb756de548a8b634876f729b63ffe9925e > core/src/main/scala/kafka/server/RequestPurgatory.scala > 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b > core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala > af4783646803e58714770c21f8c3352370f26854 > core/src/main/scala/kafka/tools/TestEndToEndLatency.scala > 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef > core/src/test/scala/other/kafka/StressTestLog.scala > 8fcd068b248688c40e73117dc119fa84cceb95b3 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala > 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 > core/src/test/scala/unit/kafka/log/LogManagerTest.scala > 7d4c70ce651b1af45cf9bb69b974aa770de8e59d > core/src/test/scala/unit/kafka/log/LogSegmentTest.scala > 6b7603728ae5217565d68b92dd5349e7c6508f31 > core/src/test/scala/unit/kafka/log/LogTest.scala > 1da1393983d4b20330e7c7f374424edd1b26f2a3 > core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala > 6db245c956d2172cde916defdb0749081bf891fd > core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > e532c2826c96bfa47d9a3c41b4d71dbf69541eac > core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala > 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 > core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala > 0ec120a4a953114e88c575dd6b583874371a09e3 > core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > 9abf219f0efb1a020db9a6623e1672a1affd5cfc > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 4f61f8469df99e02d6ce7aad897d10e158cca8fd > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b > > Diff: https://reviews.apache.org/r/23767/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >