----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23767/#review48283 -----------------------------------------------------------
Thanks for the patch. There are some unused imports. Detailed comments below. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84747> We may not be able to remove the readlock here. The issue is that this method accesses not only leaderReplicaIdOpt, but other internal data structures like assignedReplicaMap. Without the lock, the read from the Map could fail even it's being concurrently modified. In general, we can get away with the lock only if we want to read a single internal value. Perhaps we can introduce another function isLeaderLocal() that returns a boolean. This method will only need to access leaderReplicaIdOpt. Then all callers will first call leaderReplicaIfLocal and hold onto the leader replica. They can then use isLeaderLocal to see if the leader has changed subsequently. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84749> It's possible that we get an UnknownOffsetMetadata during the conversion. In this case, we probably should set HW to logEndOffset. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84757> Instead of using negation, could we do leaderHW.precedes(replica.logEndOffset)? Also, could we move && to the previous line? core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84750> Not sure if we need to copy since inSyncReplicas is immutable. We probably just need to do a reference assignment. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84812> This seems to be an existing problem. If ack=-1, a safer check is HW >= requiredOffset. This way, we will be sure that if ISR expands, the acked message is guaranteed to be in the replicas newly added to ISR. The following is an example that shows the issue with the existing check. Suppose that all replicas in ISR are at offset 10, but HW is still at 8 and we call checkEnoughReplicasReachOffset on offset 9. The check will be satisfied and the message is considered committed. We will be updating HW to 10 pretty soon. However, before that happens, another replica whose LEO is at 8 can be added to ISR. This replica won't have message 9, which is acked as committed. core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84832> Perhaps we could create TopicPartionRequestKey just once? core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/23767/#comment84833> Move && to previous line? core/src/main/scala/kafka/cluster/Replica.scala <https://reviews.apache.org/r/23767/#comment84834> Should we rename highWatermarkValue and logEndOffsetValue to highWatermarkMetadata and logEndOffsetMetadata? core/src/main/scala/kafka/log/Log.scala <https://reviews.apache.org/r/23767/#comment84698> It would be a bit confusing to reason about the consistency btw nextOffset and nextOffsetMetadata since they are not updated atomically. Could we just keep nextOffsetMetadata? core/src/main/scala/kafka/log/Log.scala <https://reviews.apache.org/r/23767/#comment84699> Perhaps we can add a bit details to the comment. So, we are in the situation that the startOffset is in range, but we can't find a single message whose offset is >= startOffset. One possibility seems to be that all messages after startOffset have been deleted due to compaction. Is that the only case? Let's describe all situations when this can happen. core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/23767/#comment84915> In DelayedProduce, we don't send the response immediately if one partition has an error. Should we do the same thing for DelayedFetch? Will that make the logic simpler? core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/23767/#comment84836> All those should probably be private. core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/23767/#comment84913> Will this case ever happen? If so, could we add a comment how this can happen? core/src/main/scala/kafka/server/FetchRequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84746> This can be private. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/23767/#comment84910> If ack is >1, it won't be -1. So "but no = -1" is redundant. core/src/main/scala/kafka/server/LogOffsetMetadata.scala <https://reviews.apache.org/r/23767/#comment84814> Would it be better to name it offsetOrdering? core/src/main/scala/kafka/server/LogOffsetMetadata.scala <https://reviews.apache.org/r/23767/#comment84687> The ordering doesn't match that in the signature. core/src/main/scala/kafka/server/LogOffsetMetadata.scala <https://reviews.apache.org/r/23767/#comment84919> Should we just use one constructor with defaults? core/src/main/scala/kafka/server/LogOffsetMetadata.scala <https://reviews.apache.org/r/23767/#comment84689> Do we need the space after !? core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84736> This can be private. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/23767/#comment84837> TopicAndPartition -> (PartitionData, OffsetMetadta) It probably will be clearer if we use a case Class PartitionDataAndOffsetMetadata, instead of using a pair. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/23767/#comment84839> Perhaps it will be clearer if we return a FetchResponseAndOffsetMetadata instead of a pair? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/23767/#comment84754> Would updateReplicaLEOAndHW be enough? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/23767/#comment84912> Do we need to log (topic, partition) twice? core/src/main/scala/kafka/server/RequestKey.scala <https://reviews.apache.org/r/23767/#comment84920> Should we rename this to DelayedRequestKey? core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84921> These comments may need to be changed according to the comments below. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84718> I actually think it's more intuitive to return true if the request is satisfied by the caller. Then, we can assign a meaningful return val in the caller. val isSatisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(request) Also, We need to explain this method better. How about the following comment? Potentially add the request for watch on all keys. Return true iff the request is satisfied and the satisfaction is done by the caller. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84728> key doesn't seem to be used. core/src/main/scala/kafka/server/RequestPurgatory.scala <https://reviews.apache.org/r/23767/#comment84709> Could we name this checkAndMaybeAdd and add the comment? core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala <https://reviews.apache.org/r/23767/#comment84922> Can this just be created as Map(a -> b)? - Jun Rao On July 21, 2014, 7:53 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23767/ > ----------------------------------------------------------- > > (Updated July 21, 2014, 7:53 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1430 > https://issues.apache.org/jira/browse/KAFKA-1430 > > > Repository: kafka > > > Description > ------- > > Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even > if the corresponding message set is empty; 2. Purgatory checking satisfactory > in checkAndMaybeWatch synchronously, and will only return false if this > thread successfully set the satisfactory bit to true; 3. Remove the read lock > on Partition's reading of the leaderOpt and epoch and making them volatile > instead since these two functions are just single read; 4. Fix some minor > issues in TestEndToEndLatency; 5. Other minor fixes > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchResponse.scala > d117f10f724b09d6deef0df3a138d28fc91aa13a > core/src/main/scala/kafka/cluster/Partition.scala > f2ca8562f833f09d96ec4bd37efcacf69cd84b2e > core/src/main/scala/kafka/cluster/Replica.scala > 5e659b4a5c0256431aecc200a6b914472da9ecf3 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 0e64632210385ef63c2ad3445b55ac4f37a63df2 > core/src/main/scala/kafka/log/FileMessageSet.scala > b2652ddbe2f857028d5980e29a008b2c614694a3 > core/src/main/scala/kafka/log/Log.scala > b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 > core/src/main/scala/kafka/log/LogCleaner.scala > 2faa196a4dc612bc634d5ff5f5f275d09073f13b > core/src/main/scala/kafka/log/LogSegment.scala > 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > 3b15254f32252cf824d7a292889ac7662d73ada1 > core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION > core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION > core/src/main/scala/kafka/server/KafkaApis.scala > fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 > core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION > core/src/main/scala/kafka/server/OffsetManager.scala > 0e22897cd1c7e45c58a61c3c468883611b19116d > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > PRE-CREATION > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > 75ae1e161769a020a102009df416009bd6710f4a > core/src/main/scala/kafka/server/ReplicaManager.scala > 6a56a772c134dbf1e70c1bfe067223009bfdbac8 > core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION > core/src/main/scala/kafka/server/RequestPurgatory.scala > 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b > core/src/main/scala/kafka/tools/TestEndToEndLatency.scala > 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef > core/src/test/scala/other/kafka/StressTestLog.scala > 8fcd068b248688c40e73117dc119fa84cceb95b3 > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala > 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 > core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala > cec1caecc51507ae339ebf8f3b8a028b12a1a056 > core/src/test/scala/unit/kafka/log/LogManagerTest.scala > d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 > core/src/test/scala/unit/kafka/log/LogSegmentTest.scala > 6b7603728ae5217565d68b92dd5349e7c6508f31 > core/src/test/scala/unit/kafka/log/LogTest.scala > 1da1393983d4b20330e7c7f374424edd1b26f2a3 > core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala > 6db245c956d2172cde916defdb0749081bf891fd > core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 > core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala > 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 > core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala > 0ec120a4a953114e88c575dd6b583874371a09e3 > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 4f61f8469df99e02d6ce7aad897d10e158cca8fd > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b > > Diff: https://reviews.apache.org/r/23767/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >